This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 73002588e4 [INLONG-774][Sort] Add Doc for MySQL CDC support read data 
from specific timestamp / earliest offset / specific offset (#775)
73002588e4 is described below

commit 73002588e496addfd69ad3bcc0cf6f68b8ebb717
Author: emhui <[email protected]>
AuthorDate: Mon Jun 19 10:06:59 2023 +0800

    [INLONG-774][Sort] Add Doc for MySQL CDC support read data from specific 
timestamp / earliest offset / specific offset (#775)
---
 docs/data_node/extract_node/mysql-cdc.md           | 597 +++++----------------
 .../current/data_node/extract_node/mysql-cdc.md    | 596 +++++---------------
 2 files changed, 243 insertions(+), 950 deletions(-)

diff --git a/docs/data_node/extract_node/mysql-cdc.md 
b/docs/data_node/extract_node/mysql-cdc.md
index 0304c7d088..2700539f9b 100644
--- a/docs/data_node/extract_node/mysql-cdc.md
+++ b/docs/data_node/extract_node/mysql-cdc.md
@@ -118,296 +118,60 @@ TODO: It will be supported in the future.
 
 ## MySQL Extract Node Options
 
-<div class="highlight">
-<table class="colwidths-auto docutils">
-    <thead>
-      <tr>
-        <th class="text-left" style={{width: '10%'}}>Option</th>
-        <th class="text-left" style={{width: '8%'}}>Required</th>
-        <th class="text-left" style={{width: '7%'}}>Default</th>
-        <th class="text-left" style={{width: '10%'}}>Type</th>
-        <th class="text-left" style={{width: '65%'}}>Description</th>
-      </tr>
-    </thead>
-    <tbody>
-    <tr>
-      <td>connector</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Specify what connector to use, here should be 
<code>'mysql-cdc-inlong'</code>.</td>
-    </tr>
-    <tr>
-      <td>hostname</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>IP address or hostname of the MySQL database server.</td>
-    </tr>
-    <tr>
-      <td>username</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Name of the MySQL database to use when connecting to the MySQL 
database server.</td>
-    </tr>
-    <tr>
-      <td>password</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Password to use when connecting to the MySQL database server.</td>
-    </tr>
-    <tr>
-      <td>database-name</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Database name of the MySQL server to monitor. The database-name also 
supports regular expressions to monitor multiple tables matches the regular 
expression.</td>
-    </tr> 
-    <tr>
-      <td>table-name</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Table name of the MySQL database to monitor. The table-name also 
supports regular expressions to monitor multiple tables matches the regular 
expression.</td>
-    </tr>
-    <tr>
-      <td>port</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>3306</td>
-      <td>Integer</td>
-      <td>Integer port number of the MySQL database server.</td>
-    </tr>
-    <tr>
-      <td>server-id</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>Integer</td>
-      <td>A numeric ID or a numeric ID range of this database client, The 
numeric ID syntax is like '5400', 
-          the numeric ID range syntax is like '5400-5408', The numeric ID 
range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled.
-          Every ID must be unique across all currently-running database 
processes in the MySQL cluster. This connector joins the MySQL cluster
-          as another server (with this unique ID) so it can read the binlog. 
By default, a random number is generated between 5400 and 6400,
-          though we recommend setting an explicit value.
-      </td>
-    </tr>
-    <tr>
-          <td>scan.incremental.snapshot.enabled</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>true</td>
-          <td>Boolean</td>
-          <td>Incremental snapshot is a new mechanism to read snapshot of a 
table. Compared to the old snapshot mechanism,
-              the incremental snapshot has many advantages, including:
-                (1) source can be parallel during snapshot reading, 
-                (2) source can perform checkpoints in the chunk granularity 
during snapshot reading, 
-                (3) source doesn't need to acquire global read lock (FLUSH 
TABLES WITH READ LOCK) before snapshot reading.
-              If you would like the source run in parallel, each parallel 
reader should have an unique server id, so 
-              the 'server-id' must be a range like '5400-6400', and the range 
must be larger than the parallelism.
-              Please see <a 
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading";>Incremental
 Snapshot Reading</a>section for more detailed information.
-          </td>
-    </tr>
-    <tr>
-          <td>scan.incremental.snapshot.chunk.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>8096</td>
-          <td>Integer</td>
-          <td>The chunk size (number of rows) of table snapshot, captured 
tables are split into multiple chunks when read the snapshot of table.</td>
-    </tr>
-    <tr>
-          <td>scan.snapshot.fetch.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>1024</td>
-          <td>Integer</td>
-          <td>The maximum fetch size for per poll when read table 
snapshot.</td>
-    </tr>
-    <tr>
-      <td>scan.startup.mode</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>initial</td>
-      <td>String</td>
-      <td>Optional startup mode for MySQL CDC consumer, valid enumerations are 
"initial"
-           and "latest-offset". 
-           Please see <a 
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#startup-reading-position";>Startup
 Reading Position</a>section for more detailed information.</td>
-    </tr> 
-    <tr>
-      <td>server-time-zone</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>UTC</td>
-      <td>String</td>
-      <td>The session time zone in database server, e.g. "Asia/Shanghai". 
-          It controls how the TIMESTAMP type in MYSQL converted to STRING.
-          See more <a 
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types";>here</a>.</td>
-    </tr>
-    <tr>
-      <td>debezium.min.row.
-      count.to.stream.result</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>1000</td>
-      <td>Integer</td>
-      <td>During a snapshot operation, the connector will query each included 
table to produce a read event for all rows in that table. This parameter 
determines whether the MySQL connection will pull all results for a table into 
memory (which is fast but requires large amounts of memory), or whether the 
results will instead be streamed (can be slower, but will work for very large 
tables). The value specifies the minimum number of rows a table must contain 
before the connector will strea [...]
-    </tr>
-    <tr>
-          <td>connect.timeout</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>30s</td>
-          <td>Duration</td>
-          <td>The maximum time that the connector should wait after trying to 
connect to the MySQL database server before timing out.</td>
-    </tr>    
-    <tr>
-          <td>connect.max-retries</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>3</td>
-          <td>Integer</td>
-          <td>The max retry times that the connector should retry to build 
MySQL database server connection.</td>
-    </tr>
-    <tr>
-          <td>connection.pool.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>20</td>
-          <td>Integer</td>
-          <td>The connection pool size.</td>
-    </tr>
-    <tr>
-          <td>jdbc.properties.*</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>20</td>
-          <td>String</td>
-          <td>Option to pass custom JDBC URL properties. User can pass custom 
properties like 'jdbc.properties.useSSL' = 'false'.</td>
-    </tr>
-    <tr>
-          <td>heartbeat.interval</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>30s</td>
-          <td>Duration</td>
-          <td>The interval of sending heartbeat event for tracing the latest 
available binlog offsets.</td>
-    </tr>
-    <tr>
-          <td>append-mode</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>Whether to support append only, if true the MySQL Extract Node 
will Convert all upsert streams to append streams to support downstream 
scenarios where upsert streams are not supported.</td>
-    </tr>
-    <tr>
-          <td>migrate-all</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>Whether it is a full database migration scenario, if it is 
'true', MySQL Extract Node will compress the physical fields and other meta 
fields of the table into 'json'.
-              The special 'data' meta field of the format, currently supports 
two data formats, if you need data in 'canal json' format,
-              then use the 'data_canal' metadata field, or use the 
'data_debezium' metadata field if data in 'debezium json' format is 
required.</td>
-    </tr>
-    <tr>
-          <td>row-kinds-filtered</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>The specific operation type that needs to be retained, where +U 
corresponds to the data before the update, -U corresponds to the updated data, 
and +I corresponds to the data before the update.
-              Inserted data (the existing data is the data of the insert 
type), -D represents the deleted data, if you want to keep multiple operation 
types, use & connection.
-              For example +I&-D, the connector will only output the inserted 
and deleted data, and the updated data will not be output. </td>
-    </tr>
-    <tr>
-      <td>debezium.*</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Pass-through Debezium's properties to Debezium Embedded Engine which 
is used to capture data changes from MySQL server.
-          For example: <code>'debezium.snapshot.mode' = 'never'</code>.
-          See more about the <a 
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties";>Debezium's
 MySQL Connector properties</a></td> 
-    </tr>
-    <tr>
-      <td>inlong.metric.labels</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>Inlong metric label, format of value is 
groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].</td> 
-    </tr>
-    </tbody>
-</table>
-</div>
+|Option| Required| Default| Type| Description|
+| --- | --- | --- | --- | --- |
+| connector | required | (none) | String |     Specify what connector to use, 
here should be `'mysql-cdc-inlong'`.|
+| hostname | required | (none) | String |      IP address or hostname of the 
MySQL database server. |
+| username | required | (none) | String |      Name of the MySQL database to 
use when connecting to the MySQL database server.|
+| password | required | (none) | String |      Password to use when connecting 
to the MySQL database server.|
+| database-name | required | (none) | String | Database name of the MySQL 
server to monitor. The database-name also supports regular expressions to 
monitor multiple tables matches the regular expression.|
+| table-name | required | (none) | String |    Table name of the MySQL 
database to monitor. The table-name also supports regular expressions to 
monitor multiple tables matches the regular expression.|
+| port | optional | 3306 | Integer |   Integer port number of the MySQL 
database server.|
+| server-id | optional | (none) | Integer |    A numeric ID or a numeric ID 
range of this database client, The numeric ID syntax is like '5400', the 
numeric ID range syntax is like '5400-5408', The numeric ID range syntax is 
recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be 
unique across all currently-running database processes in the MySQL cluster. 
This connector joins the MySQL cluster as another server (with this unique ID) 
so it can read the binlog. By defa [...]
+| scan.incremental.snapshot.enabled | optional | true | Boolean |      
Incremental snapshot is a new mechanism to read snapshot of a table. Compared 
to the old snapshot mechanism,the incremental snapshot has many advantages, 
including:(1) source can be parallel during snapshot reading, (2) source can 
perform checkpoints in the chunk granularity during snapshot reading, (3) 
source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) 
before snapshot reading. If you would like [...]
+| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer |   The 
chunk size (number of rows) of table snapshot, captured tables are split into 
multiple chunks when read the snapshot of table.|
+| scan.snapshot.fetch.size | optional | 1024 | Integer |       The maximum 
fetch size for per poll when read table snapshot.|
+| scan.startup.mode | optional | initial | String |    Optional startup mode 
for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", 
"latest-offset", "specific-offset" and "timestamp". Please see [Startup Reading 
Position](#startup-reading-position) section for more detailed information.|
+|scan.startup.specific-offset.file |optional |(none) |String |Optional binlog 
file name used in case of "specific-offset" startup mode |
+| scan.startup.specific-offset.pos| optional |>(none) | Long | Optional binlog 
file position used in case of "specific-offset" startup mode| 
+| scan.startup.specific-offset.gtid-set| optional| none) | String | Optional 
GTID set used in case of "specific-offset" startup mode|
+| scan.startup.specific-offset.skip-events |optional | (none) | Long | number 
of events to skip after the specific starting offset| 
+| scan.startup.specific-offset.skip-rows | optional | (none) | Long | Optional 
number of rows to skip after the specific starting offset|
+|server-time-zone |optional |UTC |String |The session time zone in database 
server, e.g. "Asia/Shanghai".  It controls how the TIMESTAMP type in MYSQL 
converted to STRING. See more 
[here](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types";).|
+| debezium.min.row.count.to.stream.result | optional | 1000 | Integer | During 
a snapshot operation, the connector will query each included table to produce a 
read event for all rows in that table. This parameter determines whether the 
MySQL connection will pull all results for a table into memory (which is fast 
but requires large amounts of memory), or whether the results will instead be 
streamed (can be slower, but will work for very large tables). The value 
specifies the minimum numbe [...]
+| connect.timeout | optional | 30s | Duration | The maximum time that the 
connector should wait after trying to connect to the MySQL database server 
before timing out.|
+| connect.max-retries| optional| 3| Integer| The max retry times that the 
connector should retry to build MySQL database server connection.|
+| connection.pool.size| optional| 20| Integer| The connection pool size.|
+| jdbc.properties.* | optional| 20 | String| Option to pass custom JDBC URL 
properties. User can pass custom properties like 'jdbc.properties.useSSL' = 
'false'.|
+| heartbeat.interval| optional| 30s| Duration | The interval of sending 
heartbeat event for tracing the latest available binlog offsets.|
+| append-mode | optional | false | Boolean | Whether to support append only, 
if true the MySQL Extract Node will Convert all upsert streams to append 
streams to support downstream scenarios where upsert streams are not supported.|
+| migrate-all | optional | false | Boolean | Whether it is a full database 
migration scenario, if it is 'true', MySQL Extract Node will compress the 
physical fields and other meta fields of the table into 'json'. The special 
'data' meta field of the format, currently supports two data formats, if you 
need data in 'canal json' format, then use the 'data_canal' metadata field, or 
use the 'data_debezium' metadata field if data in 'debezium json' format is 
required.|
+| row-kinds-filtered| optional| false| Boolean | The specific operation type 
that needs to be retained, where +U corresponds to the data before the update, 
-U corresponds to the updated data, and +I corresponds to the data before the 
update. Inserted data (the existing data is the data of the insert type), -D 
represents the deleted data, if you want to keep multiple operation types, use 
& connection. For example +I&-D, the connector will only output the inserted 
and deleted data, and the [...]
+| debezium.* | optional | (none) | String | Pass-through Debezium's properties 
to Debezium Embedded Engine which is used to capture data changes from MySQL 
server. For example: `'debezium.snapshot.mode' = 'never'`. See more about the 
[Debezium's MySQL Connector 
properties](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties)|
 
+| inlong.metric.labels | optional | (none) | String | Inlong metric label, 
format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]. |
 
 ## Available Metadata
 
 The following format metadata can be exposed as read-only (VIRTUAL) columns in 
a table definition.
 
-<table class="colwidths-auto docutils">
-  <thead>
-     <tr>
-       <th class="text-left" style={{width: '15%'}}>Key</th>
-       <th class="text-left" style={{width: '30%'}}>DataType</th>
-       <th class="text-left" style={{width: '55%'}}>Description</th>
-     </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>meta.table_name</td>
-      <td>STRING NOT NULL</td>
-      <td>Name of the table that contain the row.</td>
-    </tr>
-    <tr>
-      <td>meta.database_name</td>
-      <td>STRING NOT NULL</td>
-      <td>Name of the database that contain the row.</td>
-    </tr>
-    <tr>
-      <td>meta.op_ts</td>
-      <td>TIMESTAMP_LTZ(3) NOT NULL</td>
-      <td>It indicates the time that the change was made in the database. 
<br/>If the record is read from snapshot of the table instead of the binlog, 
the value is always 0.</td>
-    </tr>
-    <tr>
-      <td>meta.op_type</td>
-      <td>STRING</td>
-      <td>Type of database operation, such as INSERT/DELETE, etc.</td>
-    </tr>
-    <tr>
-      <td>meta.data_canal</td>
-      <td>STRING/BYTES</td>
-      <td>Data for rows in `canal-json` format only exists when the 
`migrate-all` option is 'true'.</td>
-    </tr>
-    <tr>
-      <td>meta.data_debezium</td>
-      <td>STRING/BYTES</td>
-      <td>Data for `debezium-json` formatted lines only exists if the 
`migrate-all` option is 'true'.</td>
-    </tr>
-    <tr>
-      <td>meta.is_ddl</td>
-      <td>BOOLEAN</td>
-      <td>Whether the DDL statement.</td>
-    </tr>
-    <tr>
-      <td>meta.ts</td>
-      <td>TIMESTAMP_LTZ(3) NOT NULL</td>
-      <td>The current time when the row was received and processed.</td>
-    </tr>
-    <tr>
-      <td>meta.sql_type</td>
-      <td>MAP</td>
-      <td>Mapping of sql_type table fields to java data type IDs.</td>
-    </tr>
-    <tr>
-      <td>meta.mysql_type</td>
-      <td>MAP</td>
-      <td>Structure of the table.</td>
-    </tr>
-    <tr>
-      <td>meta.pk_names</td>
-      <td>ARRAY</td>
-      <td>Primay key name of the table.</td>
-    </tr>
-    <tr>
-      <td>meta.batch_id</td>
-      <td>BIGINT</td>
-      <td>Batch id of the Binlog.</td>
-    </tr>
-    <tr>
-      <td>meta.update_before</td>
-      <td>ARRAY</td>
-      <td>Data of the row before update.</td>
-    </tr>
-  </tbody>
-</table>
+| Key | DataType | Description| 
+| ---  | --- | --- | 
+| meta.table_name | STRING NOT NULL | Name of the table that contain the row.|
+| meta.database_name | STRING NOT NULL | Name of the database that contain the 
row.|
+| meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the 
change was made in the database. <br/>If the record is read from snapshot of 
the table instead of the binlog, the value is always 0.|
+| meta.op_type | STRING | Type of database operation, such as INSERT/DELETE, 
etc.|
+| meta.data_canal | STRING/BYTES | Data for rows in `canal-json` format only 
exists when the `migrate-all` option is 'true'.|
+| meta.data_debezium | STRING/BYTES | Data for `debezium-json` formatted lines 
only exists if the `migrate-all` option is 'true'.|
+| meta.is_ddl | BOOLEAN | Whether the DDL statement. | meta.ts | 
TIMESTAMP_LTZ(3) NOT NULL | The current time when the row was received and 
processed.|
+| meta.ts | TIMESTAMP_LTZ(3) NOT NULL | The current time when the row was 
received and processed. |
+| meta.sql_type | MAP | Mapping of sql_type table fields to java data type 
IDs.|
+| meta.mysql_type | MAP | Structure of the table.|
+| meta.pk_names | ARRAY | Primay key name of the table.|
+| meta.batch_id | BIGINT | Batch id of the Binlog.|
+| meta.update_before | ARRAY | Data of the row before update.|
 
 The extended CREATE TABLE example demonstrates the syntax for exposing these 
metadata fields:
+
 ```sql
 CREATE TABLE `mysql_extract_node` (
      `id` INT,
@@ -440,197 +204,32 @@ CREATE TABLE `mysql_extract_node` (
 
 ## Data Type Mapping
 
-<div class="wy-table-responsive">
-<table class="colwidths-auto docutils">
-    <thead>
-      <tr>
-        <th class="text-left">MySQL type</th>
-        <th class="text-left">Flink SQL type</th>
-        <th class="text-left">NOTE</th>
-      </tr>
-    </thead>
-    <tbody>
-    <tr>
-      <td>TINYINT</td>
-      <td>TINYINT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        SMALLINT<br/>
-        TINYINT UNSIGNED</td>
-      <td>SMALLINT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        INT<br/>
-        MEDIUMINT<br/>
-        SMALLINT UNSIGNED</td>
-      <td>INT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BIGINT<br/>
-        INT UNSIGNED</td>
-      <td>BIGINT</td>
-      <td></td>
-    </tr>
-   <tr>
-      <td>BIGINT UNSIGNED</td>
-      <td>DECIMAL(20, 0)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        REAL<br/>
-        FLOAT<br/>
-        </td>
-      <td>FLOAT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        DOUBLE
-      </td>
-      <td>DOUBLE</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        NUMERIC(p, s)<br/>
-        DECIMAL(p, s)<br/>
-        where p &lt;= 38<br/>
-      </td>
-      <td>DECIMAL(p, s)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        NUMERIC(p, s)<br/>
-        DECIMAL(p, s)<br/>
-        where 38 &lt; p &lt;= 65<br/>
-      </td>
-      <td>STRING</td>
-      <td>The precision for DECIMAL data type is up to 65 in MySQL, but the 
precision for DECIMAL is limited to 38 in Flink.
-  So if you define a decimal column whose precision is greater than 38, you 
should map it to STRING to avoid precision loss.</td>
-    </tr>
-    <tr>
-      <td>
-        BOOLEAN<br/>
-        TINYINT(1)<br/>
-        BIT(1)
-        </td>
-      <td>BOOLEAN</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>DATE</td>
-      <td>DATE</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>TIME [(p)]</td>
-      <td>TIME [(p)]</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>TIMESTAMP [(p)]<br/>
-        DATETIME [(p)]
-      </td>
-      <td>TIMESTAMP [(p)]
-      </td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        CHAR(n)
-      </td>
-      <td>CHAR(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        VARCHAR(n)
-      </td>
-      <td>VARCHAR(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BIT(n)
-      </td>
-      <td>BINARY(⌈n/8⌉)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BINARY(n)
-      </td>
-      <td>BINARY(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        VARBINARY(N)
-      </td>
-      <td>VARBINARY(N)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        TINYTEXT<br/>
-        TEXT<br/>
-        MEDIUMTEXT<br/>
-        LONGTEXT<br/>
-      </td>
-      <td>STRING</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        TINYBLOB<br/>
-        BLOB<br/>
-        MEDIUMBLOB<br/>
-        LONGBLOB<br/>
-      </td>
-      <td>BYTES</td>
-      <td>Currently, for BLOB data type in MySQL, only the blob whose length 
isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. </td>
-    </tr>
-    <tr>
-      <td>
-        YEAR
-      </td>
-      <td>INT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        ENUM
-      </td>
-      <td>STRING</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        JSON
-      </td>
-      <td>STRING</td>
-      <td>The JSON data type  will be converted into STRING with JSON format 
in Flink.</td>
-    </tr>
-    <tr>
-      <td>
-        SET
-      </td>
-      <td>ARRAY&lt;STRING&gt;</td>
-      <td>As the SET data type in MySQL is a string object that can have zero 
or more values, 
-          it should always be mapped to an array of string
-      </td>
-    </tr>
-    </tbody>
-</table>
-</div>
+| MySQL type | Flink SQL type | NOTE |
+| --- | --- | --- |
+| TINYINT | TINYINT |     |
+| SMALLINT<br/>TINYINT UNSIGNED | SMALLINT |     |
+| INT<br/>MEDIUMINT<br/>SMALLINT UNSIGNED | INT |     |
+| BIGINT<br/>INT UNSIGNED | BIGINT |     |
+| BIGINT UNSIGNED | DECIMAL(20, 0) |     |
+| REAL<br/>FLOAT | FLOAT |     |
+| DOUBLE | DOUBLE |     |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where p <= 38 | DECIMAL(p, s) |     |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where 38 < p <= 65 | STRING | The 
precision for DECIMAL data type is up to 65 in MySQL, but the precision for 
DECIMAL is limited to 38 in Flink. So if you define a decimal column whose 
precision is greater than 38, you should map it to STRING to avoid precision 
loss. |
+| BOOLEAN<br/>TINYINT(1)<br/>BIT(1) | BOOLEAN |     |
+| DATE | DATE |     |
+| TIME [(p)] | TIME [(p)] |     |
+| TIMESTAMP [(p)]<br/>DATETIME [(p)] | TIMESTAMP [(p)] |     |
+| CHAR(n) | CHAR(n) |     |
+| VARCHAR(n) | VARCHAR(n) |     |
+| BIT(n) | BINARY(⌈n/8⌉) |     |
+| BINARY(n) | BINARY(n) |     |
+| VARBINARY(N) | VARBINARY(N) |     |
+| TINYTEXT<br/>TEXT<br/>MEDIUMTEXT<br/>LONGTEXT | STRING |     |
+| TINYBLOB<br/>BLOB<br/>MEDIUMBLOB<br/>LONGBLOB | BYTES | Currently, for BLOB 
data type in MySQL, only the blob whose length isn't greater than 
2,147,483,647(2 ** 31 - 1) is supported. |
+| YEAR | INT |     |
+| ENUM | STRING |     |
+| JSON | STRING | The JSON data type will be converted into STRING with JSON 
format in Flink. |
+| SET | ARRAY&lt;STRING&gt; | As the SET data type in MySQL is a string object 
that can have zero or more values, it should always be mapped to an array of 
string |
 
 ## Features
 
@@ -662,3 +261,53 @@ WITH (
 'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*'
 )
 ````
+
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for MySQL CDC 
consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database 
tables upon first startup, and continue to read the latest binlog.
+- `earliest-offset`: Skip snapshot phase and start reading binlog events from 
the earliest accessible binlog offset.
+- `latest-offset`: Never to perform snapshot on the monitored database tables 
upon first startup, just read from
+  the end of the binlog which means only have the changes since the connector 
was started.
+- `specific-offset`: Skip snapshot phase and start reading binlog events from 
a specific offset. The offset could be
+  specified with binlog filename and position, or a GTID set if GTID is 
enabled on server.
+- `timestamp`: Skip snapshot phase and start reading binlog events from a 
specific timestamp.
+
+For example in DataStream API:
+```java
+MySQLSource.builder()
+    .startupOptions(StartupOptions.earliest()) // Start from earliest offset
+    .startupOptions(StartupOptions.latest()) // Start from latest offset
+    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 
Start from binlog file and offset
+    
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"))
 // Start from GTID set
+    .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from 
timestamp
+    ...
+    .build()
+```
+
+and with SQL:
+
+```SQL
+CREATE TABLE mysql_source (...) WITH (
+    'connector' = 'mysql-cdc',
+    'scan.startup.mode' = 'earliest-offset', -- Start from earliest offset
+    'scan.startup.mode' = 'latest-offset', -- Start from latest offset
+    'scan.startup.mode' = 'specific-offset', -- Start from specific offset
+    'scan.startup.mode' = 'timestamp', -- Start from timestamp
+    'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Binlog 
filename under specific offset startup mode
+    'scan.startup.specific-offset.pos' = '4', -- Binlog position under 
specific offset mode
+    'scan.startup.specific-offset.gtid-set' = 
'24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- GTID set under specific offset 
startup mode
+    'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under 
timestamp startup mode
+    ...
+)
+```
+
+:::caution
+
+- MySQL source will print the current binlog position into logs with INFO 
level on checkpoint, with the prefix
+   "Binlog offset on checkpoint {checkpoint-id}". It could be useful if you 
want to restart the job from a specific checkpointed position.
+- If schema of capturing tables was changed previously, starting with earliest 
offset, specific offset or timestamp
+   could fail as the Debezium reader keeps the current latest table schema 
internally and earlier records with unmatched schema cannot be correctly parsed.
+
+:::
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
index d53f3e8572..4a9aa55e4d 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
@@ -114,294 +114,57 @@ TODO: 将在未来支持此功能。
 
 ## MySQL Extract 节点参数
 
-<div class="highlight">
-<table class="colwidths-auto docutils">
-    <thead>
-      <tr>
-        <th class="text-left" style={{width: '10%'}}>参数</th>
-        <th class="text-left" style={{width: '8%'}}>是否必须</th>
-        <th class="text-left" style={{width: '7%'}}>默认值</th>
-        <th class="text-left" style={{width: '10%'}}>数据类型</th>
-        <th class="text-left" style={{width: '65%'}}>描述</th>
-      </tr>
-    </thead>
-    <tbody>
-    <tr>
-      <td>connector</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>指定要使用的连接器,这里应该是 <code>'mysql-cdc-inlong'</code>。</td>
-    </tr>
-    <tr>
-      <td>hostname</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>MySQL 数据库服务器的 IP 地址或主机名。</td>
-    </tr>
-    <tr>
-      <td>username</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。</td>
-    </tr>
-    <tr>
-      <td>password</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>连接到 MySQL 数据库服务器时使用的密码。</td>
-    </tr>
-    <tr>
-      <td>database-name</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>要监控的 MySQL 服务器的数据库名称。 database-name 还支持正则表达式来监控多个表是否匹配正则表达式。</td>
-    </tr> 
-    <tr>
-      <td>table-name</td>
-      <td>required</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>要监控的 MySQL 数据库的表名。 table-name 还支持正则表达式来监控多个表是否匹配正则表达式。</td>
-    </tr>
-    <tr>
-      <td>port</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>3306</td>
-      <td>Integer</td>
-      <td>MySQL 数据库服务器的整数端口号。</td>
-    </tr>
-    <tr>
-      <td>server-id</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>Integer</td>
-      <td>此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 语法类似于 '5400',
-          数字 Id 
范围语法类似于“5400-5408”,启用“scan.incremental.snapshot.enabled”时建议使用数字 Id 范围语法。
-          在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群
-          作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,
-          尽管我们建议设置一个明确的值。
-      </td>
-    </tr>
-    <tr>
-          <td>scan.incremental.snapshot.enabled</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>true</td>
-          <td>Boolean</td>
-          <td>增量快照是一种读取表快照的新机制。与旧的快照机制相比,
-              增量快照有很多优点,包括:
-                (1) 快照读取时 Source 可以并行,
-                (2) Source 可以在快照读取时在 Chunk 粒度上进行检查点,
-                (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)。
-              如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以
-              'server-id' 必须是 '5400-6400' 这样的范围,并且范围必须大于并行度。
-              请参阅<a 
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading";>增量快照阅读</a>部分了解更多详细信息。
-          </td>
-    </tr>
-    <tr>
-          <td>scan.incremental.snapshot.chunk.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>8096</td>
-          <td>Integer</td>
-          <td>表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。</td>
-    </tr>
-    <tr>
-          <td>scan.snapshot.fetch.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>1024</td>
-          <td>Integer</td>
-          <td>读取表快照时每次轮询的最大获取大小。</td>
-    </tr>
-    <tr>
-      <td>scan.startup.mode</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>initial</td>
-      <td>String</td>
-      <td>MySQL CDC 消费者的可选启动模式,有效枚举为"initial"
-           和"latest-offset"。
-           请参阅<a 
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#startup-reading-position";>启动阅读位置</a>部分了解更多详细信息。</td>
-    </tr> 
-    <tr>
-      <td>server-time-zone</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>UTC</td>
-      <td>String</td>
-      <td>数据库服务器中的会话时区,例如"Asia/Shanghai"。
-          它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。
-          查看更多<a 
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types";>这里</a>。</td>
-    </tr>
-    <tr>
-      <td>debezium.min.row.
-      count.to.stream.result</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>1000</td>
-      <td>Integer</td>
-      <td>在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL 
连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为
 1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。</td>
-    </tr>
-    <tr>
-          <td>connect.timeout</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>30s</td>
-          <td>Duration</td>
-          <td>连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。</td>
-    </tr>    
-    <tr>
-          <td>connect.max-retries</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>3</td>
-          <td>Integer</td>
-          <td>连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。</td>
-    </tr>
-    <tr>
-          <td>connection.pool.size</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>20</td>
-          <td>Integer</td>
-          <td>连接池大小。</td>
-    </tr>
-    <tr>
-          <td>jdbc.properties.*</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>20</td>
-          <td>String</td>
-          <td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' = 
'false'。</td>
-    </tr>
-    <tr>
-          <td>heartbeat.interval</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>30s</td>
-          <td>Duration</td>
-          <td>发送心跳事件的时间间隔,用于跟踪最新可用的 Binlog 偏移量。</td>
-    </tr>
-    <tr>
-          <td>append-mode</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>是否仅支持 Append,如果为 'true',MySQL Extract Node 会将所有 Upsert 流转换为 
Append 流,以支持不支持 Upsert 流的下游场景。</td>
-    </tr>
-    <tr>
-          <td>migrate-all</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>是否是全库迁移场景,如果为 'true',MySQL Extract Node 则将表的物理字段和其他元字段压缩成 'json' 
-              格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要 'canal json' 格式的数据,
-              则使用 'data_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用 
'data_debezium' 元数据字段。</td>
-    </tr>
-    <tr>
-          <td>row-kinds-filtered</td>
-          <td>optional</td>
-          <td style={{wordWrap: 'break-word'}}>false</td>
-          <td>Boolean</td>
-          <td>需要保留的特定的操作类型,其中 +U 对应更新前的数据,-U 对应更新后的数据,+I 对应
-              插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。
-              举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。</td>
-    </tr>
-    <tr>
-      <td>debezium.*</td>
-      <td>optional</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>将 Debezium 的属性传递给用于从 MySQL 服务器捕获数据更改的 Debezium Embedded Engine。
-          例如:<code>'debezium.snapshot.mode' = 'never'</code>。
-          详细了解 <a 
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties";>Debezium
 的 MySQL 连接器属性。</a></td> 
-    </tr>
-    <tr>
-      <td>inlong.metric.labels</td>
-      <td>可选</td>
-      <td style={{wordWrap: 'break-word'}}>(none)</td>
-      <td>String</td>
-      <td>inlong metric 
的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。</td> 
-    </tr>
-    </tbody>
-</table>
-</div>
+| 参数  | 是否必须 | 默认值 | 数据类型 | 描述  |
+| --- | --- | --- | --- | --- |
+| connector | required | (none) | String | 指定要使用的连接器,这里应该是 
`'mysql-cdc-inlong'`。 |
+| hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
+| username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。 |
+| password | required | (none) | String | 连接到 MySQL 数据库服务器时使用的密码。 |
+| database-name | required | (none) | String | 要监控的 MySQL 服务器的数据库名称。 
database-name 还支持正则表达式来监控多个表是否匹配正则表达式。 |
+| table-name | required | (none) | String | 要监控的 MySQL 数据库的表名。 table-name 
还支持正则表达式来监控多个表是否匹配正则表达式。 |
+| port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
+| server-id | optional | (none) | Integer | 此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 
语法类似于 '5400', 数字 Id 范围语法类似于 '5400-5408',启用 'scan.incremental.snapshot.enabled' 
时建议使用数字 Id 范围语法。 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群 
作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数, 
尽管我们建议设置一个明确的值。 |
+| scan.incremental.snapshot.enabled | optional | true | Boolean | 
增量快照是一种读取表快照的新机制。与旧的快照机制相比, 增量快照有很多优点,包括: (1) 快照读取时 Source 可以并行, (2) Source 
可以在快照读取时在 Chunk 粒度上进行检查点, (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ 
LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以 'server-id' 必须是 '5400-6400' 
这样的范围,并且范围必须大于并行度。 
请参阅[增量快照阅读](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading)部分了解更多详细信息。
 |
+| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 
表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。 |
+| scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次轮询的最大获取大小。 |
+| scan.startup.mode | optional | initial | String | MySQL CDC 消费者的可选启动模式,有效枚举为 
'initial' 和 'latest-offset'。 请参阅[启动阅读位置](#启动模式)部分了解更多详细信息。 |
+| scan.startup.specific-offset.file | optional | (none) | String | 在 
'specific-offset' 启动模式下,启动位点的 binlog 文件名。|
+| scan.startup.specific-offset.pos| optional | (none) | Long | 在 
'specific-offset' 启动模式下,启动位点的 binlog 文件位置。|
+| scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 
'specific-offset' 启动模式下,启动位点的 GTID 集合。|
+| scan.startup.specific-offset.skip-events | optional | (none) | Long | 
在指定的启动位点后需要跳过的事件数量。|
+| scan.startup.specific-offset.skip-rows | optional | (none) | Long | 
在指定的启动位点后需要跳过的数据行数量。|
+| server-time-zone | optional | UTC | String | 数据库服务器中的会话时区,例如 
'Asia/Shanghai'。 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。 
查看更多[这里](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types)。
 |
+| debezium.min.row. count.to.stream.result | optional | 1000 | Integer | 
在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL 
连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为
 1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。 |
+| connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 
数据库服务器后在超时之前应等待的最长时间。 |
+| connect.max-retries | optional | 3   | Integer | 连接器应重试以建立 MySQL 
数据库服务器连接的最大重试次数。 |
+| connection.pool.size | optional | 20  | Integer | 连接池大小。 |
+| jdbc.properties.* | optional | 20  | String | 传递自定义 JDBC URL 
属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' = 'false'。 |
+| heartbeat.interval | optional | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的 
Binlog 偏移量。 |
+| append-mode | optional | false | Boolean | 是否仅支持 Append,如果为 'true',MySQL 
Extract Node 会将所有 Upsert 流转换为 Append 流,以支持不支持 Upsert 流的下游场景。 |
+| migrate-all | optional | false | Boolean | 是否是全库迁移场景,如果为 'true',MySQL 
Extract Node 则将表的物理字段和其他元字段压缩成 'json' 格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要 
'canal json' 格式的数据, 则使用 'data\_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用 
'data\_debezium' 元数据字段。 |
+| row-kinds-filtered | optional | false | Boolean | 需要保留的特定的操作类型,其中 +U 
对应更新前的数据,-U 对应更新后的数据,+I 对应 插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。 
举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。 |
+| debezium.* | optional | (none) | String | 将 Debezium 的属性传递给用于从 MySQL 
服务器捕获数据更改的 Debezium Embedded Engine。 例如:`'debezium.snapshot.mode' = 'never'`。 
详细了解 [Debezium 的 MySQL 
连接器属性。](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties)
 |
+| inlong.metric.labels | 可选  | (none) | String | inlong metric 
的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。 |
 
 ## 可用的元数据字段
 
 以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
 
-<table class="colwidths-auto docutils">
-  <thead>
-     <tr>
-       <th class="text-left" style={{width: '15%'}}>字段名称</th>
-       <th class="text-left" style={{width: '30%'}}>数据类型</th>
-       <th class="text-left" style={{width: '55%'}}>描述</th>
-     </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>meta.table_name</td>
-      <td>STRING NOT NULL</td>
-      <td>该行所属的表名。</td>
-    </tr>
-    <tr>
-      <td>meta.database_name</td>
-      <td>STRING NOT NULL</td>
-      <td>该行所属的数据库名称。</td>
-    </tr>
-    <tr>
-      <td>meta.op_ts</td>
-      <td>TIMESTAMP_LTZ(3) NOT NULL</td>
-      <td>它指示在数据库中进行更改的时间。 <br/>如果记录是从表的快照而不是binlog中读取的,则该值始终为0。</td>
-    </tr>
-    <tr>
-      <td>meta.op_type</td>
-      <td>STRING</td>
-      <td>数据库操作的类型,如 INSERT/DELETE 等。</td>
-    </tr>
-    <tr>
-      <td>meta.data_canal</td>
-      <td>STRING/BYTES</td>
-      <td>`canal-json` 格式化的行的数据只有在 `migrate-all` 选项为 'true' 时才存在。</td>
-    </tr>
-    <tr>
-      <td>meta.data_debezium</td>
-      <td>STRING/BYTES</td>
-      <td>`debezium-json` 格式化的行的数据只有在 `migrate-all` 选项为 'true' 时才存在。</td>
-    </tr>
-    <tr>
-      <td>meta.is_ddl</td>
-      <td>BOOLEAN</td>
-      <td>是否是 DDL 语句。</td>
-    </tr>
-    <tr>
-      <td>meta.ts</td>
-      <td>TIMESTAMP_LTZ(3) NOT NULL</td>
-      <td>接收和处理行的当前时间。</td>
-    </tr>
-    <tr>
-      <td>meta.sql_type</td>
-      <td>MAP</td>
-      <td>将 Sql_type 表字段映射到 Java 数据类型 Id。</td>
-    </tr>
-    <tr>
-      <td>meta.mysql_type</td>
-      <td>MAP</td>
-      <td>表的结构。</td>
-    </tr>
-    <tr>
-      <td>meta.pk_names</td>
-      <td>ARRAY</td>
-      <td>表的主键名称。</td>
-    </tr>
-    <tr>
-      <td>meta.batch_id</td>
-      <td>BIGINT</td>
-      <td>Binlog的批次id。</td>
-    </tr>
-    <tr>
-      <td>meta.update_before</td>
-      <td>ARRAY</td>
-      <td>该行更新前的数据。</td>
-    </tr>
-  </tbody>
-</table>
+| 字段名称 | 数据类型 | 描述  |
+| --- | --- | --- |
+| meta.table_name | STRING NOT NULL | 该行所属的表名。 |
+| meta.database_name | STRING NOT NULL | 该行所属的数据库名称。 |
+| meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | 
它指示在数据库中进行更改的时间。<br/>如果记录是从表的快照而不是binlog中读取的,则该值始终为0。 |
+| meta.op_type | STRING | 数据库操作的类型,如 INSERT/DELETE 等。 |
+| meta.data_canal | STRING/BYTES | \`canal-json\` 格式化的行的数据只有在 \`migrate-all\` 
选项为 'true' 时才存在。 |
+| meta.data_debezium | STRING/BYTES | \`debezium-json\` 格式化的行的数据只有在 
\`migrate-all\` 选项为 'true' 时才存在。 |
+| meta.is_ddl | BOOLEAN | 是否是 DDL 语句。 |
+| meta.ts | TIMESTAMP_LTZ(3) NOT NULL | 接收和处理行的当前时间。 |
+| meta.sql_type | MAP | 将 Sql_type 表字段映射到 Java 数据类型 Id。 |
+| meta.mysql_type | MAP | 表的结构。 |
+| meta.pk_names | ARRAY | 表的主键名称。 |
+| meta.batch_id | BIGINT | Binlog的批次id。 |
+| meta.update_before | ARRAY | 该行更新前的数据。 |
 
 扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:
 
@@ -437,198 +200,32 @@ CREATE TABLE `mysql_extract_node` (
 
 ## 数据类型映射
 
-<div class="wy-table-responsive">
-<table class="colwidths-auto docutils">
-    <thead>
-      <tr>
-        <th class="text-left">MySQL type</th>
-        <th class="text-left">Flink SQL type</th>
-        <th class="text-left">NOTE</th>
-      </tr>
-    </thead>
-    <tbody>
-    <tr>
-      <td>TINYINT</td>
-      <td>TINYINT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        SMALLINT<br/>
-        TINYINT UNSIGNED</td>
-      <td>SMALLINT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        INT<br/>
-        MEDIUMINT<br/>
-        SMALLINT UNSIGNED</td>
-      <td>INT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BIGINT<br/>
-        INT UNSIGNED</td>
-      <td>BIGINT</td>
-      <td></td>
-    </tr>
-   <tr>
-      <td>BIGINT UNSIGNED</td>
-      <td>DECIMAL(20, 0)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        REAL<br/>
-        FLOAT<br/>
-        </td>
-      <td>FLOAT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        DOUBLE
-      </td>
-      <td>DOUBLE</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        NUMERIC(p, s)<br/>
-        DECIMAL(p, s)<br/>
-        where p &lt;= 38<br/>
-      </td>
-      <td>DECIMAL(p, s)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        NUMERIC(p, s)<br/>
-        DECIMAL(p, s)<br/>
-        where 38 &lt; p &lt;= 65<br/>
-      </td>
-      <td>STRING</td>
-      <td>The precision for DECIMAL data type is up to 65 in MySQL, but the 
precision for DECIMAL is limited to 38 in Flink.
-  So if you define a decimal column whose precision is greater than 38, you 
should map it to STRING to avoid precision loss.</td>
-    </tr>
-    <tr>
-      <td>
-        BOOLEAN<br/>
-        TINYINT(1)<br/>
-        BIT(1)
-        </td>
-      <td>BOOLEAN</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>DATE</td>
-      <td>DATE</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>TIME [(p)]</td>
-      <td>TIME [(p)]</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>TIMESTAMP [(p)]<br/>
-        DATETIME [(p)]
-      </td>
-      <td>TIMESTAMP [(p)]
-      </td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        CHAR(n)
-      </td>
-      <td>CHAR(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        VARCHAR(n)
-      </td>
-      <td>VARCHAR(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BIT(n)
-      </td>
-      <td>BINARY(⌈n/8⌉)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        BINARY(n)
-      </td>
-      <td>BINARY(n)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        VARBINARY(N)
-      </td>
-      <td>VARBINARY(N)</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        TINYTEXT<br/>
-        TEXT<br/>
-        MEDIUMTEXT<br/>
-        LONGTEXT<br/>
-      </td>
-      <td>STRING</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        TINYBLOB<br/>
-        BLOB<br/>
-        MEDIUMBLOB<br/>
-        LONGBLOB<br/>
-      </td>
-      <td>BYTES</td>
-      <td>Currently, for BLOB data type in MySQL, only the blob whose length 
isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. </td>
-    </tr>
-    <tr>
-      <td>
-        YEAR
-      </td>
-      <td>INT</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        ENUM
-      </td>
-      <td>STRING</td>
-      <td></td>
-    </tr>
-    <tr>
-      <td>
-        JSON
-      </td>
-      <td>STRING</td>
-      <td>The JSON data type  will be converted into STRING with JSON format 
in Flink.</td>
-    </tr>
-    <tr>
-      <td>
-        SET
-      </td>
-      <td>ARRAY&lt;STRING&gt;</td>
-      <td>As the SET data type in MySQL is a string object that can have zero 
or more values, 
-          it should always be mapped to an array of string
-      </td>
-    </tr>
-    </tbody>
-</table>
-</div>
-
+| MySQL type | Flink SQL type | NOTE |
+| --- | --- | --- |
+| TINYINT | TINYINT |     |
+| SMALLINT<br/>TINYINT UNSIGNED | SMALLINT |     |
+| INT<br/>MEDIUMINT<br/>SMALLINT UNSIGNED | INT |     |
+| BIGINT<br/>INT UNSIGNED | BIGINT |     |
+| BIGINT UNSIGNED | DECIMAL(20, 0) |     |
+| REAL<br/>FLOAT | FLOAT |     |
+| DOUBLE | DOUBLE |     |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where p <= 38 | DECIMAL(p, s) |     |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where 38 < p <= 65 | STRING | 在 MySQL 
中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 
的十进制列,则应将其映射到字符串以避免精度损失。在 MySQL 
中,十进制数据类型的精度高达65,但在Flink中,十进制数据类型的精度仅限于38。所以,如果定义精度大于 38 
的十进制列,则应将其映射到字符串以避免精度损失。 |
+| BOOLEAN<br/>TINYINT(1)<br/>BIT(1) | BOOLEAN |     |
+| DATE | DATE |     |
+| TIME [(p)] | TIME [(p)] |     |
+| TIMESTAMP [(p)]<br/>DATETIME [(p)] | TIMESTAMP [(p)] |     |
+| CHAR(n) | CHAR(n) |     |
+| VARCHAR(n) | VARCHAR(n) |     |
+| BIT(n) | BINARY(⌈n/8⌉) |     |
+| BINARY(n) | BINARY(n) |     |
+| VARBINARY(N) | VARBINARY(N) |     |
+| TINYTEXT<br/>TEXT<br/>MEDIUMTEXT<br/>LONGTEXT | STRING |     |
+| TINYBLOB<br/>BLOB<br/>MEDIUMBLOB<br/>LONGBLOB | BYTES | 目前,对于 MySQL 中的 BLOB 
数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
+| YEAR | INT |     |
+| ENUM | STRING |     |
+| JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
+| SET | ARRAY&lt;STRING&gt; | 因为 MySQL 中的 SET 数据类型是一个字符串对象,可以有零个或多个值 
它应该始终映射到字符串数组。|
 
 ## 特性
 
@@ -659,4 +256,51 @@ WITH (
 'password' = 'inlong',
 'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*'
 )
-```
\ No newline at end of file
+```
+
+### 启动模式
+
+配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括:
+
+- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
+- `earliest-offset`:跳过快照阶段,从可读取的最早 binlog 位点开始读取
+- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 
的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
+- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 
在集群上启用时通过 GTID 集合指定。
+- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
+
+例如使用 DataStream API:
+```java
+MySQLSource.builder()
+    .startupOptions(StartupOptions.earliest()) // 从最早位点启动
+    .startupOptions(StartupOptions.latest()) // 从最晚位点启动
+    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 
从指定 binlog 文件名和位置启动
+    
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"))
 // 从 GTID 集合启动
+    .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
+    ...
+    .build()
+```
+
+使用 SQL:
+
+```SQL
+CREATE TABLE mysql_source (...) WITH (
+    'connector' = 'mysql-cdc',
+    'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
+    'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
+    'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
+    'scan.startup.mode' = 'timestamp', -- 从特定位点启动
+    'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 
binlog 文件名
+    'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
+    'scan.startup.specific-offset.gtid-set' = 
'24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
+    'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
+    ...
+)
+```
+
+:::caution
+
+- MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on 
checkpoint {checkpoint-id}"。
+   该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
+- 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 
读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。
+
+:::
\ No newline at end of file

Reply via email to