This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git
The following commit(s) were added to refs/heads/master by this push:
new 73002588e4 [INLONG-774][Sort] Add Doc for MySQL CDC support read data
from specific timestamp / earliest offset / specific offset (#775)
73002588e4 is described below
commit 73002588e496addfd69ad3bcc0cf6f68b8ebb717
Author: emhui <[email protected]>
AuthorDate: Mon Jun 19 10:06:59 2023 +0800
[INLONG-774][Sort] Add Doc for MySQL CDC support read data from specific
timestamp / earliest offset / specific offset (#775)
---
docs/data_node/extract_node/mysql-cdc.md | 597 +++++----------------
.../current/data_node/extract_node/mysql-cdc.md | 596 +++++---------------
2 files changed, 243 insertions(+), 950 deletions(-)
diff --git a/docs/data_node/extract_node/mysql-cdc.md
b/docs/data_node/extract_node/mysql-cdc.md
index 0304c7d088..2700539f9b 100644
--- a/docs/data_node/extract_node/mysql-cdc.md
+++ b/docs/data_node/extract_node/mysql-cdc.md
@@ -118,296 +118,60 @@ TODO: It will be supported in the future.
## MySQL Extract Node Options
-<div class="highlight">
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left" style={{width: '10%'}}>Option</th>
- <th class="text-left" style={{width: '8%'}}>Required</th>
- <th class="text-left" style={{width: '7%'}}>Default</th>
- <th class="text-left" style={{width: '10%'}}>Type</th>
- <th class="text-left" style={{width: '65%'}}>Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>connector</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Specify what connector to use, here should be
<code>'mysql-cdc-inlong'</code>.</td>
- </tr>
- <tr>
- <td>hostname</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>IP address or hostname of the MySQL database server.</td>
- </tr>
- <tr>
- <td>username</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Name of the MySQL database to use when connecting to the MySQL
database server.</td>
- </tr>
- <tr>
- <td>password</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Password to use when connecting to the MySQL database server.</td>
- </tr>
- <tr>
- <td>database-name</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Database name of the MySQL server to monitor. The database-name also
supports regular expressions to monitor multiple tables matches the regular
expression.</td>
- </tr>
- <tr>
- <td>table-name</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Table name of the MySQL database to monitor. The table-name also
supports regular expressions to monitor multiple tables matches the regular
expression.</td>
- </tr>
- <tr>
- <td>port</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>3306</td>
- <td>Integer</td>
- <td>Integer port number of the MySQL database server.</td>
- </tr>
- <tr>
- <td>server-id</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>Integer</td>
- <td>A numeric ID or a numeric ID range of this database client, The
numeric ID syntax is like '5400',
- the numeric ID range syntax is like '5400-5408', The numeric ID
range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled.
- Every ID must be unique across all currently-running database
processes in the MySQL cluster. This connector joins the MySQL cluster
- as another server (with this unique ID) so it can read the binlog.
By default, a random number is generated between 5400 and 6400,
- though we recommend setting an explicit value.
- </td>
- </tr>
- <tr>
- <td>scan.incremental.snapshot.enabled</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>true</td>
- <td>Boolean</td>
- <td>Incremental snapshot is a new mechanism to read snapshot of a
table. Compared to the old snapshot mechanism,
- the incremental snapshot has many advantages, including:
- (1) source can be parallel during snapshot reading,
- (2) source can perform checkpoints in the chunk granularity
during snapshot reading,
- (3) source doesn't need to acquire global read lock (FLUSH
TABLES WITH READ LOCK) before snapshot reading.
- If you would like the source run in parallel, each parallel
reader should have an unique server id, so
- the 'server-id' must be a range like '5400-6400', and the range
must be larger than the parallelism.
- Please see <a
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading">Incremental
Snapshot Reading</a>section for more detailed information.
- </td>
- </tr>
- <tr>
- <td>scan.incremental.snapshot.chunk.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>8096</td>
- <td>Integer</td>
- <td>The chunk size (number of rows) of table snapshot, captured
tables are split into multiple chunks when read the snapshot of table.</td>
- </tr>
- <tr>
- <td>scan.snapshot.fetch.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>1024</td>
- <td>Integer</td>
- <td>The maximum fetch size for per poll when read table
snapshot.</td>
- </tr>
- <tr>
- <td>scan.startup.mode</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>initial</td>
- <td>String</td>
- <td>Optional startup mode for MySQL CDC consumer, valid enumerations are
"initial"
- and "latest-offset".
- Please see <a
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#startup-reading-position">Startup
Reading Position</a>section for more detailed information.</td>
- </tr>
- <tr>
- <td>server-time-zone</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>UTC</td>
- <td>String</td>
- <td>The session time zone in database server, e.g. "Asia/Shanghai".
- It controls how the TIMESTAMP type in MYSQL converted to STRING.
- See more <a
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types">here</a>.</td>
- </tr>
- <tr>
- <td>debezium.min.row.
- count.to.stream.result</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>1000</td>
- <td>Integer</td>
- <td>During a snapshot operation, the connector will query each included
table to produce a read event for all rows in that table. This parameter
determines whether the MySQL connection will pull all results for a table into
memory (which is fast but requires large amounts of memory), or whether the
results will instead be streamed (can be slower, but will work for very large
tables). The value specifies the minimum number of rows a table must contain
before the connector will strea [...]
- </tr>
- <tr>
- <td>connect.timeout</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>30s</td>
- <td>Duration</td>
- <td>The maximum time that the connector should wait after trying to
connect to the MySQL database server before timing out.</td>
- </tr>
- <tr>
- <td>connect.max-retries</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>3</td>
- <td>Integer</td>
- <td>The max retry times that the connector should retry to build
MySQL database server connection.</td>
- </tr>
- <tr>
- <td>connection.pool.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>20</td>
- <td>Integer</td>
- <td>The connection pool size.</td>
- </tr>
- <tr>
- <td>jdbc.properties.*</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>20</td>
- <td>String</td>
- <td>Option to pass custom JDBC URL properties. User can pass custom
properties like 'jdbc.properties.useSSL' = 'false'.</td>
- </tr>
- <tr>
- <td>heartbeat.interval</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>30s</td>
- <td>Duration</td>
- <td>The interval of sending heartbeat event for tracing the latest
available binlog offsets.</td>
- </tr>
- <tr>
- <td>append-mode</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>Whether to support append only, if true the MySQL Extract Node
will Convert all upsert streams to append streams to support downstream
scenarios where upsert streams are not supported.</td>
- </tr>
- <tr>
- <td>migrate-all</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>Whether it is a full database migration scenario, if it is
'true', MySQL Extract Node will compress the physical fields and other meta
fields of the table into 'json'.
- The special 'data' meta field of the format, currently supports
two data formats, if you need data in 'canal json' format,
- then use the 'data_canal' metadata field, or use the
'data_debezium' metadata field if data in 'debezium json' format is
required.</td>
- </tr>
- <tr>
- <td>row-kinds-filtered</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>The specific operation type that needs to be retained, where +U
corresponds to the data before the update, -U corresponds to the updated data,
and +I corresponds to the data before the update.
- Inserted data (the existing data is the data of the insert
type), -D represents the deleted data, if you want to keep multiple operation
types, use & connection.
- For example +I&-D, the connector will only output the inserted
and deleted data, and the updated data will not be output. </td>
- </tr>
- <tr>
- <td>debezium.*</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Pass-through Debezium's properties to Debezium Embedded Engine which
is used to capture data changes from MySQL server.
- For example: <code>'debezium.snapshot.mode' = 'never'</code>.
- See more about the <a
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties">Debezium's
MySQL Connector properties</a></td>
- </tr>
- <tr>
- <td>inlong.metric.labels</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>Inlong metric label, format of value is
groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].</td>
- </tr>
- </tbody>
-</table>
-</div>
+|Option| Required| Default| Type| Description|
+| --- | --- | --- | --- | --- |
+| connector | required | (none) | String | Specify what connector to use,
here should be `'mysql-cdc-inlong'`.|
+| hostname | required | (none) | String | IP address or hostname of the
MySQL database server. |
+| username | required | (none) | String | Name of the MySQL database to
use when connecting to the MySQL database server.|
+| password | required | (none) | String | Password to use when connecting
to the MySQL database server.|
+| database-name | required | (none) | String | Database name of the MySQL
server to monitor. The database-name also supports regular expressions to
monitor multiple tables matches the regular expression.|
+| table-name | required | (none) | String | Table name of the MySQL
database to monitor. The table-name also supports regular expressions to
monitor multiple tables matches the regular expression.|
+| port | optional | 3306 | Integer | Integer port number of the MySQL
database server.|
+| server-id | optional | (none) | Integer | A numeric ID or a numeric ID
range of this database client, The numeric ID syntax is like '5400', the
numeric ID range syntax is like '5400-5408', The numeric ID range syntax is
recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be
unique across all currently-running database processes in the MySQL cluster.
This connector joins the MySQL cluster as another server (with this unique ID)
so it can read the binlog. By defa [...]
+| scan.incremental.snapshot.enabled | optional | true | Boolean |
Incremental snapshot is a new mechanism to read snapshot of a table. Compared
to the old snapshot mechanism,the incremental snapshot has many advantages,
including:(1) source can be parallel during snapshot reading, (2) source can
perform checkpoints in the chunk granularity during snapshot reading, (3)
source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK)
before snapshot reading. If you would like [...]
+| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The
chunk size (number of rows) of table snapshot, captured tables are split into
multiple chunks when read the snapshot of table.|
+| scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum
fetch size for per poll when read table snapshot.|
+| scan.startup.mode | optional | initial | String | Optional startup mode
for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset",
"latest-offset", "specific-offset" and "timestamp". Please see [Startup Reading
Position](#startup-reading-position) section for more detailed information.|
+|scan.startup.specific-offset.file |optional |(none) |String |Optional binlog
file name used in case of "specific-offset" startup mode |
+| scan.startup.specific-offset.pos| optional |>(none) | Long | Optional binlog
file position used in case of "specific-offset" startup mode|
+| scan.startup.specific-offset.gtid-set| optional| none) | String | Optional
GTID set used in case of "specific-offset" startup mode|
+| scan.startup.specific-offset.skip-events |optional | (none) | Long | number
of events to skip after the specific starting offset|
+| scan.startup.specific-offset.skip-rows | optional | (none) | Long | Optional
number of rows to skip after the specific starting offset|
+|server-time-zone |optional |UTC |String |The session time zone in database
server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL
converted to STRING. See more
[here](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types").|
+| debezium.min.row.count.to.stream.result | optional | 1000 | Integer | During
a snapshot operation, the connector will query each included table to produce a
read event for all rows in that table. This parameter determines whether the
MySQL connection will pull all results for a table into memory (which is fast
but requires large amounts of memory), or whether the results will instead be
streamed (can be slower, but will work for very large tables). The value
specifies the minimum numbe [...]
+| connect.timeout | optional | 30s | Duration | The maximum time that the
connector should wait after trying to connect to the MySQL database server
before timing out.|
+| connect.max-retries| optional| 3| Integer| The max retry times that the
connector should retry to build MySQL database server connection.|
+| connection.pool.size| optional| 20| Integer| The connection pool size.|
+| jdbc.properties.* | optional| 20 | String| Option to pass custom JDBC URL
properties. User can pass custom properties like 'jdbc.properties.useSSL' =
'false'.|
+| heartbeat.interval| optional| 30s| Duration | The interval of sending
heartbeat event for tracing the latest available binlog offsets.|
+| append-mode | optional | false | Boolean | Whether to support append only,
if true the MySQL Extract Node will Convert all upsert streams to append
streams to support downstream scenarios where upsert streams are not supported.|
+| migrate-all | optional | false | Boolean | Whether it is a full database
migration scenario, if it is 'true', MySQL Extract Node will compress the
physical fields and other meta fields of the table into 'json'. The special
'data' meta field of the format, currently supports two data formats, if you
need data in 'canal json' format, then use the 'data_canal' metadata field, or
use the 'data_debezium' metadata field if data in 'debezium json' format is
required.|
+| row-kinds-filtered| optional| false| Boolean | The specific operation type
that needs to be retained, where +U corresponds to the data before the update,
-U corresponds to the updated data, and +I corresponds to the data before the
update. Inserted data (the existing data is the data of the insert type), -D
represents the deleted data, if you want to keep multiple operation types, use
& connection. For example +I&-D, the connector will only output the inserted
and deleted data, and the [...]
+| debezium.* | optional | (none) | String | Pass-through Debezium's properties
to Debezium Embedded Engine which is used to capture data changes from MySQL
server. For example: `'debezium.snapshot.mode' = 'never'`. See more about the
[Debezium's MySQL Connector
properties](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties)|
+| inlong.metric.labels | optional | (none) | String | Inlong metric label,
format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]. |
## Available Metadata
The following format metadata can be exposed as read-only (VIRTUAL) columns in
a table definition.
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left" style={{width: '15%'}}>Key</th>
- <th class="text-left" style={{width: '30%'}}>DataType</th>
- <th class="text-left" style={{width: '55%'}}>Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>meta.table_name</td>
- <td>STRING NOT NULL</td>
- <td>Name of the table that contain the row.</td>
- </tr>
- <tr>
- <td>meta.database_name</td>
- <td>STRING NOT NULL</td>
- <td>Name of the database that contain the row.</td>
- </tr>
- <tr>
- <td>meta.op_ts</td>
- <td>TIMESTAMP_LTZ(3) NOT NULL</td>
- <td>It indicates the time that the change was made in the database.
<br/>If the record is read from snapshot of the table instead of the binlog,
the value is always 0.</td>
- </tr>
- <tr>
- <td>meta.op_type</td>
- <td>STRING</td>
- <td>Type of database operation, such as INSERT/DELETE, etc.</td>
- </tr>
- <tr>
- <td>meta.data_canal</td>
- <td>STRING/BYTES</td>
- <td>Data for rows in `canal-json` format only exists when the
`migrate-all` option is 'true'.</td>
- </tr>
- <tr>
- <td>meta.data_debezium</td>
- <td>STRING/BYTES</td>
- <td>Data for `debezium-json` formatted lines only exists if the
`migrate-all` option is 'true'.</td>
- </tr>
- <tr>
- <td>meta.is_ddl</td>
- <td>BOOLEAN</td>
- <td>Whether the DDL statement.</td>
- </tr>
- <tr>
- <td>meta.ts</td>
- <td>TIMESTAMP_LTZ(3) NOT NULL</td>
- <td>The current time when the row was received and processed.</td>
- </tr>
- <tr>
- <td>meta.sql_type</td>
- <td>MAP</td>
- <td>Mapping of sql_type table fields to java data type IDs.</td>
- </tr>
- <tr>
- <td>meta.mysql_type</td>
- <td>MAP</td>
- <td>Structure of the table.</td>
- </tr>
- <tr>
- <td>meta.pk_names</td>
- <td>ARRAY</td>
- <td>Primay key name of the table.</td>
- </tr>
- <tr>
- <td>meta.batch_id</td>
- <td>BIGINT</td>
- <td>Batch id of the Binlog.</td>
- </tr>
- <tr>
- <td>meta.update_before</td>
- <td>ARRAY</td>
- <td>Data of the row before update.</td>
- </tr>
- </tbody>
-</table>
+| Key | DataType | Description|
+| --- | --- | --- |
+| meta.table_name | STRING NOT NULL | Name of the table that contain the row.|
+| meta.database_name | STRING NOT NULL | Name of the database that contain the
row.|
+| meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the
change was made in the database. <br/>If the record is read from snapshot of
the table instead of the binlog, the value is always 0.|
+| meta.op_type | STRING | Type of database operation, such as INSERT/DELETE,
etc.|
+| meta.data_canal | STRING/BYTES | Data for rows in `canal-json` format only
exists when the `migrate-all` option is 'true'.|
+| meta.data_debezium | STRING/BYTES | Data for `debezium-json` formatted lines
only exists if the `migrate-all` option is 'true'.|
+| meta.is_ddl | BOOLEAN | Whether the DDL statement. | meta.ts |
TIMESTAMP_LTZ(3) NOT NULL | The current time when the row was received and
processed.|
+| meta.ts | TIMESTAMP_LTZ(3) NOT NULL | The current time when the row was
received and processed. |
+| meta.sql_type | MAP | Mapping of sql_type table fields to java data type
IDs.|
+| meta.mysql_type | MAP | Structure of the table.|
+| meta.pk_names | ARRAY | Primay key name of the table.|
+| meta.batch_id | BIGINT | Batch id of the Binlog.|
+| meta.update_before | ARRAY | Data of the row before update.|
The extended CREATE TABLE example demonstrates the syntax for exposing these
metadata fields:
+
```sql
CREATE TABLE `mysql_extract_node` (
`id` INT,
@@ -440,197 +204,32 @@ CREATE TABLE `mysql_extract_node` (
## Data Type Mapping
-<div class="wy-table-responsive">
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left">MySQL type</th>
- <th class="text-left">Flink SQL type</th>
- <th class="text-left">NOTE</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>TINYINT</td>
- <td>TINYINT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- SMALLINT<br/>
- TINYINT UNSIGNED</td>
- <td>SMALLINT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- INT<br/>
- MEDIUMINT<br/>
- SMALLINT UNSIGNED</td>
- <td>INT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BIGINT<br/>
- INT UNSIGNED</td>
- <td>BIGINT</td>
- <td></td>
- </tr>
- <tr>
- <td>BIGINT UNSIGNED</td>
- <td>DECIMAL(20, 0)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- REAL<br/>
- FLOAT<br/>
- </td>
- <td>FLOAT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- DOUBLE
- </td>
- <td>DOUBLE</td>
- <td></td>
- </tr>
- <tr>
- <td>
- NUMERIC(p, s)<br/>
- DECIMAL(p, s)<br/>
- where p <= 38<br/>
- </td>
- <td>DECIMAL(p, s)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- NUMERIC(p, s)<br/>
- DECIMAL(p, s)<br/>
- where 38 < p <= 65<br/>
- </td>
- <td>STRING</td>
- <td>The precision for DECIMAL data type is up to 65 in MySQL, but the
precision for DECIMAL is limited to 38 in Flink.
- So if you define a decimal column whose precision is greater than 38, you
should map it to STRING to avoid precision loss.</td>
- </tr>
- <tr>
- <td>
- BOOLEAN<br/>
- TINYINT(1)<br/>
- BIT(1)
- </td>
- <td>BOOLEAN</td>
- <td></td>
- </tr>
- <tr>
- <td>DATE</td>
- <td>DATE</td>
- <td></td>
- </tr>
- <tr>
- <td>TIME [(p)]</td>
- <td>TIME [(p)]</td>
- <td></td>
- </tr>
- <tr>
- <td>TIMESTAMP [(p)]<br/>
- DATETIME [(p)]
- </td>
- <td>TIMESTAMP [(p)]
- </td>
- <td></td>
- </tr>
- <tr>
- <td>
- CHAR(n)
- </td>
- <td>CHAR(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- VARCHAR(n)
- </td>
- <td>VARCHAR(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BIT(n)
- </td>
- <td>BINARY(⌈n/8⌉)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BINARY(n)
- </td>
- <td>BINARY(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- VARBINARY(N)
- </td>
- <td>VARBINARY(N)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- TINYTEXT<br/>
- TEXT<br/>
- MEDIUMTEXT<br/>
- LONGTEXT<br/>
- </td>
- <td>STRING</td>
- <td></td>
- </tr>
- <tr>
- <td>
- TINYBLOB<br/>
- BLOB<br/>
- MEDIUMBLOB<br/>
- LONGBLOB<br/>
- </td>
- <td>BYTES</td>
- <td>Currently, for BLOB data type in MySQL, only the blob whose length
isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. </td>
- </tr>
- <tr>
- <td>
- YEAR
- </td>
- <td>INT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- ENUM
- </td>
- <td>STRING</td>
- <td></td>
- </tr>
- <tr>
- <td>
- JSON
- </td>
- <td>STRING</td>
- <td>The JSON data type will be converted into STRING with JSON format
in Flink.</td>
- </tr>
- <tr>
- <td>
- SET
- </td>
- <td>ARRAY<STRING></td>
- <td>As the SET data type in MySQL is a string object that can have zero
or more values,
- it should always be mapped to an array of string
- </td>
- </tr>
- </tbody>
-</table>
-</div>
+| MySQL type | Flink SQL type | NOTE |
+| --- | --- | --- |
+| TINYINT | TINYINT | |
+| SMALLINT<br/>TINYINT UNSIGNED | SMALLINT | |
+| INT<br/>MEDIUMINT<br/>SMALLINT UNSIGNED | INT | |
+| BIGINT<br/>INT UNSIGNED | BIGINT | |
+| BIGINT UNSIGNED | DECIMAL(20, 0) | |
+| REAL<br/>FLOAT | FLOAT | |
+| DOUBLE | DOUBLE | |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where p <= 38 | DECIMAL(p, s) | |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where 38 < p <= 65 | STRING | The
precision for DECIMAL data type is up to 65 in MySQL, but the precision for
DECIMAL is limited to 38 in Flink. So if you define a decimal column whose
precision is greater than 38, you should map it to STRING to avoid precision
loss. |
+| BOOLEAN<br/>TINYINT(1)<br/>BIT(1) | BOOLEAN | |
+| DATE | DATE | |
+| TIME [(p)] | TIME [(p)] | |
+| TIMESTAMP [(p)]<br/>DATETIME [(p)] | TIMESTAMP [(p)] | |
+| CHAR(n) | CHAR(n) | |
+| VARCHAR(n) | VARCHAR(n) | |
+| BIT(n) | BINARY(⌈n/8⌉) | |
+| BINARY(n) | BINARY(n) | |
+| VARBINARY(N) | VARBINARY(N) | |
+| TINYTEXT<br/>TEXT<br/>MEDIUMTEXT<br/>LONGTEXT | STRING | |
+| TINYBLOB<br/>BLOB<br/>MEDIUMBLOB<br/>LONGBLOB | BYTES | Currently, for BLOB
data type in MySQL, only the blob whose length isn't greater than
2,147,483,647(2 ** 31 - 1) is supported. |
+| YEAR | INT | |
+| ENUM | STRING | |
+| JSON | STRING | The JSON data type will be converted into STRING with JSON
format in Flink. |
+| SET | ARRAY<STRING> | As the SET data type in MySQL is a string object
that can have zero or more values, it should always be mapped to an array of
string |
## Features
@@ -662,3 +261,53 @@ WITH (
'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*'
)
````
+
+### Startup Reading Position
+
+The config option `scan.startup.mode` specifies the startup mode for MySQL CDC
consumer. The valid enumerations are:
+
+- `initial` (default): Performs an initial snapshot on the monitored database
tables upon first startup, and continue to read the latest binlog.
+- `earliest-offset`: Skip snapshot phase and start reading binlog events from
the earliest accessible binlog offset.
+- `latest-offset`: Never to perform snapshot on the monitored database tables
upon first startup, just read from
+ the end of the binlog which means only have the changes since the connector
was started.
+- `specific-offset`: Skip snapshot phase and start reading binlog events from
a specific offset. The offset could be
+ specified with binlog filename and position, or a GTID set if GTID is
enabled on server.
+- `timestamp`: Skip snapshot phase and start reading binlog events from a
specific timestamp.
+
+For example in DataStream API:
+```java
+MySQLSource.builder()
+ .startupOptions(StartupOptions.earliest()) // Start from earliest offset
+ .startupOptions(StartupOptions.latest()) // Start from latest offset
+ .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) //
Start from binlog file and offset
+
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"))
// Start from GTID set
+ .startupOptions(StartupOptions.timestamp(1667232000000L) // Start from
timestamp
+ ...
+ .build()
+```
+
+and with SQL:
+
+```SQL
+CREATE TABLE mysql_source (...) WITH (
+ 'connector' = 'mysql-cdc',
+ 'scan.startup.mode' = 'earliest-offset', -- Start from earliest offset
+ 'scan.startup.mode' = 'latest-offset', -- Start from latest offset
+ 'scan.startup.mode' = 'specific-offset', -- Start from specific offset
+ 'scan.startup.mode' = 'timestamp', -- Start from timestamp
+ 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Binlog
filename under specific offset startup mode
+ 'scan.startup.specific-offset.pos' = '4', -- Binlog position under
specific offset mode
+ 'scan.startup.specific-offset.gtid-set' =
'24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- GTID set under specific offset
startup mode
+ 'scan.startup.timestamp-millis' = '1667232000000' -- Timestamp under
timestamp startup mode
+ ...
+)
+```
+
+:::caution
+
+- MySQL source will print the current binlog position into logs with INFO
level on checkpoint, with the prefix
+ "Binlog offset on checkpoint {checkpoint-id}". It could be useful if you
want to restart the job from a specific checkpointed position.
+- If schema of capturing tables was changed previously, starting with earliest
offset, specific offset or timestamp
+ could fail as the Debezium reader keeps the current latest table schema
internally and earlier records with unmatched schema cannot be correctly parsed.
+
+:::
\ No newline at end of file
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
index d53f3e8572..4a9aa55e4d 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
@@ -114,294 +114,57 @@ TODO: 将在未来支持此功能。
## MySQL Extract 节点参数
-<div class="highlight">
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left" style={{width: '10%'}}>参数</th>
- <th class="text-left" style={{width: '8%'}}>是否必须</th>
- <th class="text-left" style={{width: '7%'}}>默认值</th>
- <th class="text-left" style={{width: '10%'}}>数据类型</th>
- <th class="text-left" style={{width: '65%'}}>描述</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>connector</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>指定要使用的连接器,这里应该是 <code>'mysql-cdc-inlong'</code>。</td>
- </tr>
- <tr>
- <td>hostname</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>MySQL 数据库服务器的 IP 地址或主机名。</td>
- </tr>
- <tr>
- <td>username</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。</td>
- </tr>
- <tr>
- <td>password</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>连接到 MySQL 数据库服务器时使用的密码。</td>
- </tr>
- <tr>
- <td>database-name</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>要监控的 MySQL 服务器的数据库名称。 database-name 还支持正则表达式来监控多个表是否匹配正则表达式。</td>
- </tr>
- <tr>
- <td>table-name</td>
- <td>required</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>要监控的 MySQL 数据库的表名。 table-name 还支持正则表达式来监控多个表是否匹配正则表达式。</td>
- </tr>
- <tr>
- <td>port</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>3306</td>
- <td>Integer</td>
- <td>MySQL 数据库服务器的整数端口号。</td>
- </tr>
- <tr>
- <td>server-id</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>Integer</td>
- <td>此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 语法类似于 '5400',
- 数字 Id
范围语法类似于“5400-5408”,启用“scan.incremental.snapshot.enabled”时建议使用数字 Id 范围语法。
- 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群
- 作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,
- 尽管我们建议设置一个明确的值。
- </td>
- </tr>
- <tr>
- <td>scan.incremental.snapshot.enabled</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>true</td>
- <td>Boolean</td>
- <td>增量快照是一种读取表快照的新机制。与旧的快照机制相比,
- 增量快照有很多优点,包括:
- (1) 快照读取时 Source 可以并行,
- (2) Source 可以在快照读取时在 Chunk 粒度上进行检查点,
- (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)。
- 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以
- 'server-id' 必须是 '5400-6400' 这样的范围,并且范围必须大于并行度。
- 请参阅<a
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading">增量快照阅读</a>部分了解更多详细信息。
- </td>
- </tr>
- <tr>
- <td>scan.incremental.snapshot.chunk.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>8096</td>
- <td>Integer</td>
- <td>表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。</td>
- </tr>
- <tr>
- <td>scan.snapshot.fetch.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>1024</td>
- <td>Integer</td>
- <td>读取表快照时每次轮询的最大获取大小。</td>
- </tr>
- <tr>
- <td>scan.startup.mode</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>initial</td>
- <td>String</td>
- <td>MySQL CDC 消费者的可选启动模式,有效枚举为"initial"
- 和"latest-offset"。
- 请参阅<a
href="https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#startup-reading-position">启动阅读位置</a>部分了解更多详细信息。</td>
- </tr>
- <tr>
- <td>server-time-zone</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>UTC</td>
- <td>String</td>
- <td>数据库服务器中的会话时区,例如"Asia/Shanghai"。
- 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。
- 查看更多<a
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types">这里</a>。</td>
- </tr>
- <tr>
- <td>debezium.min.row.
- count.to.stream.result</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>1000</td>
- <td>Integer</td>
- <td>在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL
连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为
1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。</td>
- </tr>
- <tr>
- <td>connect.timeout</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>30s</td>
- <td>Duration</td>
- <td>连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。</td>
- </tr>
- <tr>
- <td>connect.max-retries</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>3</td>
- <td>Integer</td>
- <td>连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。</td>
- </tr>
- <tr>
- <td>connection.pool.size</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>20</td>
- <td>Integer</td>
- <td>连接池大小。</td>
- </tr>
- <tr>
- <td>jdbc.properties.*</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>20</td>
- <td>String</td>
- <td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' =
'false'。</td>
- </tr>
- <tr>
- <td>heartbeat.interval</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>30s</td>
- <td>Duration</td>
- <td>发送心跳事件的时间间隔,用于跟踪最新可用的 Binlog 偏移量。</td>
- </tr>
- <tr>
- <td>append-mode</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>是否仅支持 Append,如果为 'true',MySQL Extract Node 会将所有 Upsert 流转换为
Append 流,以支持不支持 Upsert 流的下游场景。</td>
- </tr>
- <tr>
- <td>migrate-all</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>是否是全库迁移场景,如果为 'true',MySQL Extract Node 则将表的物理字段和其他元字段压缩成 'json'
- 格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要 'canal json' 格式的数据,
- 则使用 'data_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用
'data_debezium' 元数据字段。</td>
- </tr>
- <tr>
- <td>row-kinds-filtered</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>false</td>
- <td>Boolean</td>
- <td>需要保留的特定的操作类型,其中 +U 对应更新前的数据,-U 对应更新后的数据,+I 对应
- 插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。
- 举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。</td>
- </tr>
- <tr>
- <td>debezium.*</td>
- <td>optional</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>将 Debezium 的属性传递给用于从 MySQL 服务器捕获数据更改的 Debezium Embedded Engine。
- 例如:<code>'debezium.snapshot.mode' = 'never'</code>。
- 详细了解 <a
href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties">Debezium
的 MySQL 连接器属性。</a></td>
- </tr>
- <tr>
- <td>inlong.metric.labels</td>
- <td>可选</td>
- <td style={{wordWrap: 'break-word'}}>(none)</td>
- <td>String</td>
- <td>inlong metric
的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。</td>
- </tr>
- </tbody>
-</table>
-</div>
+| 参数 | 是否必须 | 默认值 | 数据类型 | 描述 |
+| --- | --- | --- | --- | --- |
+| connector | required | (none) | String | 指定要使用的连接器,这里应该是
`'mysql-cdc-inlong'`。 |
+| hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
+| username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。 |
+| password | required | (none) | String | 连接到 MySQL 数据库服务器时使用的密码。 |
+| database-name | required | (none) | String | 要监控的 MySQL 服务器的数据库名称。
database-name 还支持正则表达式来监控多个表是否匹配正则表达式。 |
+| table-name | required | (none) | String | 要监控的 MySQL 数据库的表名。 table-name
还支持正则表达式来监控多个表是否匹配正则表达式。 |
+| port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
+| server-id | optional | (none) | Integer | 此数据库客户端的数字 Id 或数字 Id 范围,数字 Id
语法类似于 '5400', 数字 Id 范围语法类似于 '5400-5408',启用 'scan.incremental.snapshot.enabled'
时建议使用数字 Id 范围语法。 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群
作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数,
尽管我们建议设置一个明确的值。 |
+| scan.incremental.snapshot.enabled | optional | true | Boolean |
增量快照是一种读取表快照的新机制。与旧的快照机制相比, 增量快照有很多优点,包括: (1) 快照读取时 Source 可以并行, (2) Source
可以在快照读取时在 Chunk 粒度上进行检查点, (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ
LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以 'server-id' 必须是 '5400-6400'
这样的范围,并且范围必须大于并行度。
请参阅[增量快照阅读](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading)部分了解更多详细信息。
|
+| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer |
表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。 |
+| scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次轮询的最大获取大小。 |
+| scan.startup.mode | optional | initial | String | MySQL CDC 消费者的可选启动模式,有效枚举为
'initial' 和 'latest-offset'。 请参阅[启动阅读位置](#启动模式)部分了解更多详细信息。 |
+| scan.startup.specific-offset.file | optional | (none) | String | 在
'specific-offset' 启动模式下,启动位点的 binlog 文件名。|
+| scan.startup.specific-offset.pos| optional | (none) | Long | 在
'specific-offset' 启动模式下,启动位点的 binlog 文件位置。|
+| scan.startup.specific-offset.gtid-set | optional | (none) | String | 在
'specific-offset' 启动模式下,启动位点的 GTID 集合。|
+| scan.startup.specific-offset.skip-events | optional | (none) | Long |
在指定的启动位点后需要跳过的事件数量。|
+| scan.startup.specific-offset.skip-rows | optional | (none) | Long |
在指定的启动位点后需要跳过的数据行数量。|
+| server-time-zone | optional | UTC | String | 数据库服务器中的会话时区,例如
'Asia/Shanghai'。 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。
查看更多[这里](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types)。
|
+| debezium.min.row. count.to.stream.result | optional | 1000 | Integer |
在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL
连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为
1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。 |
+| connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL
数据库服务器后在超时之前应等待的最长时间。 |
+| connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL
数据库服务器连接的最大重试次数。 |
+| connection.pool.size | optional | 20 | Integer | 连接池大小。 |
+| jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL
属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' = 'false'。 |
+| heartbeat.interval | optional | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的
Binlog 偏移量。 |
+| append-mode | optional | false | Boolean | 是否仅支持 Append,如果为 'true',MySQL
Extract Node 会将所有 Upsert 流转换为 Append 流,以支持不支持 Upsert 流的下游场景。 |
+| migrate-all | optional | false | Boolean | 是否是全库迁移场景,如果为 'true',MySQL
Extract Node 则将表的物理字段和其他元字段压缩成 'json' 格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要
'canal json' 格式的数据, 则使用 'data\_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用
'data\_debezium' 元数据字段。 |
+| row-kinds-filtered | optional | false | Boolean | 需要保留的特定的操作类型,其中 +U
对应更新前的数据,-U 对应更新后的数据,+I 对应 插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。
举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。 |
+| debezium.* | optional | (none) | String | 将 Debezium 的属性传递给用于从 MySQL
服务器捕获数据更改的 Debezium Embedded Engine。 例如:`'debezium.snapshot.mode' = 'never'`。
详细了解 [Debezium 的 MySQL
连接器属性。](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties)
|
+| inlong.metric.labels | 可选 | (none) | String | inlong metric
的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。 |
## 可用的元数据字段
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left" style={{width: '15%'}}>字段名称</th>
- <th class="text-left" style={{width: '30%'}}>数据类型</th>
- <th class="text-left" style={{width: '55%'}}>描述</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>meta.table_name</td>
- <td>STRING NOT NULL</td>
- <td>该行所属的表名。</td>
- </tr>
- <tr>
- <td>meta.database_name</td>
- <td>STRING NOT NULL</td>
- <td>该行所属的数据库名称。</td>
- </tr>
- <tr>
- <td>meta.op_ts</td>
- <td>TIMESTAMP_LTZ(3) NOT NULL</td>
- <td>它指示在数据库中进行更改的时间。 <br/>如果记录是从表的快照而不是binlog中读取的,则该值始终为0。</td>
- </tr>
- <tr>
- <td>meta.op_type</td>
- <td>STRING</td>
- <td>数据库操作的类型,如 INSERT/DELETE 等。</td>
- </tr>
- <tr>
- <td>meta.data_canal</td>
- <td>STRING/BYTES</td>
- <td>`canal-json` 格式化的行的数据只有在 `migrate-all` 选项为 'true' 时才存在。</td>
- </tr>
- <tr>
- <td>meta.data_debezium</td>
- <td>STRING/BYTES</td>
- <td>`debezium-json` 格式化的行的数据只有在 `migrate-all` 选项为 'true' 时才存在。</td>
- </tr>
- <tr>
- <td>meta.is_ddl</td>
- <td>BOOLEAN</td>
- <td>是否是 DDL 语句。</td>
- </tr>
- <tr>
- <td>meta.ts</td>
- <td>TIMESTAMP_LTZ(3) NOT NULL</td>
- <td>接收和处理行的当前时间。</td>
- </tr>
- <tr>
- <td>meta.sql_type</td>
- <td>MAP</td>
- <td>将 Sql_type 表字段映射到 Java 数据类型 Id。</td>
- </tr>
- <tr>
- <td>meta.mysql_type</td>
- <td>MAP</td>
- <td>表的结构。</td>
- </tr>
- <tr>
- <td>meta.pk_names</td>
- <td>ARRAY</td>
- <td>表的主键名称。</td>
- </tr>
- <tr>
- <td>meta.batch_id</td>
- <td>BIGINT</td>
- <td>Binlog的批次id。</td>
- </tr>
- <tr>
- <td>meta.update_before</td>
- <td>ARRAY</td>
- <td>该行更新前的数据。</td>
- </tr>
- </tbody>
-</table>
+| 字段名称 | 数据类型 | 描述 |
+| --- | --- | --- |
+| meta.table_name | STRING NOT NULL | 该行所属的表名。 |
+| meta.database_name | STRING NOT NULL | 该行所属的数据库名称。 |
+| meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL |
它指示在数据库中进行更改的时间。<br/>如果记录是从表的快照而不是binlog中读取的,则该值始终为0。 |
+| meta.op_type | STRING | 数据库操作的类型,如 INSERT/DELETE 等。 |
+| meta.data_canal | STRING/BYTES | \`canal-json\` 格式化的行的数据只有在 \`migrate-all\`
选项为 'true' 时才存在。 |
+| meta.data_debezium | STRING/BYTES | \`debezium-json\` 格式化的行的数据只有在
\`migrate-all\` 选项为 'true' 时才存在。 |
+| meta.is_ddl | BOOLEAN | 是否是 DDL 语句。 |
+| meta.ts | TIMESTAMP_LTZ(3) NOT NULL | 接收和处理行的当前时间。 |
+| meta.sql_type | MAP | 将 Sql_type 表字段映射到 Java 数据类型 Id。 |
+| meta.mysql_type | MAP | 表的结构。 |
+| meta.pk_names | ARRAY | 表的主键名称。 |
+| meta.batch_id | BIGINT | Binlog的批次id。 |
+| meta.update_before | ARRAY | 该行更新前的数据。 |
扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:
@@ -437,198 +200,32 @@ CREATE TABLE `mysql_extract_node` (
## 数据类型映射
-<div class="wy-table-responsive">
-<table class="colwidths-auto docutils">
- <thead>
- <tr>
- <th class="text-left">MySQL type</th>
- <th class="text-left">Flink SQL type</th>
- <th class="text-left">NOTE</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>TINYINT</td>
- <td>TINYINT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- SMALLINT<br/>
- TINYINT UNSIGNED</td>
- <td>SMALLINT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- INT<br/>
- MEDIUMINT<br/>
- SMALLINT UNSIGNED</td>
- <td>INT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BIGINT<br/>
- INT UNSIGNED</td>
- <td>BIGINT</td>
- <td></td>
- </tr>
- <tr>
- <td>BIGINT UNSIGNED</td>
- <td>DECIMAL(20, 0)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- REAL<br/>
- FLOAT<br/>
- </td>
- <td>FLOAT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- DOUBLE
- </td>
- <td>DOUBLE</td>
- <td></td>
- </tr>
- <tr>
- <td>
- NUMERIC(p, s)<br/>
- DECIMAL(p, s)<br/>
- where p <= 38<br/>
- </td>
- <td>DECIMAL(p, s)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- NUMERIC(p, s)<br/>
- DECIMAL(p, s)<br/>
- where 38 < p <= 65<br/>
- </td>
- <td>STRING</td>
- <td>The precision for DECIMAL data type is up to 65 in MySQL, but the
precision for DECIMAL is limited to 38 in Flink.
- So if you define a decimal column whose precision is greater than 38, you
should map it to STRING to avoid precision loss.</td>
- </tr>
- <tr>
- <td>
- BOOLEAN<br/>
- TINYINT(1)<br/>
- BIT(1)
- </td>
- <td>BOOLEAN</td>
- <td></td>
- </tr>
- <tr>
- <td>DATE</td>
- <td>DATE</td>
- <td></td>
- </tr>
- <tr>
- <td>TIME [(p)]</td>
- <td>TIME [(p)]</td>
- <td></td>
- </tr>
- <tr>
- <td>TIMESTAMP [(p)]<br/>
- DATETIME [(p)]
- </td>
- <td>TIMESTAMP [(p)]
- </td>
- <td></td>
- </tr>
- <tr>
- <td>
- CHAR(n)
- </td>
- <td>CHAR(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- VARCHAR(n)
- </td>
- <td>VARCHAR(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BIT(n)
- </td>
- <td>BINARY(⌈n/8⌉)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- BINARY(n)
- </td>
- <td>BINARY(n)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- VARBINARY(N)
- </td>
- <td>VARBINARY(N)</td>
- <td></td>
- </tr>
- <tr>
- <td>
- TINYTEXT<br/>
- TEXT<br/>
- MEDIUMTEXT<br/>
- LONGTEXT<br/>
- </td>
- <td>STRING</td>
- <td></td>
- </tr>
- <tr>
- <td>
- TINYBLOB<br/>
- BLOB<br/>
- MEDIUMBLOB<br/>
- LONGBLOB<br/>
- </td>
- <td>BYTES</td>
- <td>Currently, for BLOB data type in MySQL, only the blob whose length
isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. </td>
- </tr>
- <tr>
- <td>
- YEAR
- </td>
- <td>INT</td>
- <td></td>
- </tr>
- <tr>
- <td>
- ENUM
- </td>
- <td>STRING</td>
- <td></td>
- </tr>
- <tr>
- <td>
- JSON
- </td>
- <td>STRING</td>
- <td>The JSON data type will be converted into STRING with JSON format
in Flink.</td>
- </tr>
- <tr>
- <td>
- SET
- </td>
- <td>ARRAY<STRING></td>
- <td>As the SET data type in MySQL is a string object that can have zero
or more values,
- it should always be mapped to an array of string
- </td>
- </tr>
- </tbody>
-</table>
-</div>
-
+| MySQL type | Flink SQL type | NOTE |
+| --- | --- | --- |
+| TINYINT | TINYINT | |
+| SMALLINT<br/>TINYINT UNSIGNED | SMALLINT | |
+| INT<br/>MEDIUMINT<br/>SMALLINT UNSIGNED | INT | |
+| BIGINT<br/>INT UNSIGNED | BIGINT | |
+| BIGINT UNSIGNED | DECIMAL(20, 0) | |
+| REAL<br/>FLOAT | FLOAT | |
+| DOUBLE | DOUBLE | |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where p <= 38 | DECIMAL(p, s) | |
+| NUMERIC(p, s)<br/>DECIMAL(p, s)<br/>where 38 < p <= 65 | STRING | 在 MySQL
中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38
的十进制列,则应将其映射到字符串以避免精度损失。在 MySQL
中,十进制数据类型的精度高达65,但在Flink中,十进制数据类型的精度仅限于38。所以,如果定义精度大于 38
的十进制列,则应将其映射到字符串以避免精度损失。 |
+| BOOLEAN<br/>TINYINT(1)<br/>BIT(1) | BOOLEAN | |
+| DATE | DATE | |
+| TIME [(p)] | TIME [(p)] | |
+| TIMESTAMP [(p)]<br/>DATETIME [(p)] | TIMESTAMP [(p)] | |
+| CHAR(n) | CHAR(n) | |
+| VARCHAR(n) | VARCHAR(n) | |
+| BIT(n) | BINARY(⌈n/8⌉) | |
+| BINARY(n) | BINARY(n) | |
+| VARBINARY(N) | VARBINARY(N) | |
+| TINYTEXT<br/>TEXT<br/>MEDIUMTEXT<br/>LONGTEXT | STRING | |
+| TINYBLOB<br/>BLOB<br/>MEDIUMBLOB<br/>LONGBLOB | BYTES | 目前,对于 MySQL 中的 BLOB
数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
+| YEAR | INT | |
+| ENUM | STRING | |
+| JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
+| SET | ARRAY<STRING> | 因为 MySQL 中的 SET 数据类型是一个字符串对象,可以有零个或多个值
它应该始终映射到字符串数组。|
## 特性
@@ -659,4 +256,51 @@ WITH (
'password' = 'inlong',
'table-name' = 'test01\.a{2}[0-9]$, test\.[\s\S]*'
)
-```
\ No newline at end of file
+```
+
+### 启动模式
+
+配置选项```scan.startup.mode```指定 MySQL CDC 使用者的启动模式。有效枚举包括:
+
+- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
+- `earliest-offset`:跳过快照阶段,从可读取的最早 binlog 位点开始读取
+- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
+- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID
在集群上启用时通过 GTID 集合指定。
+- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
+
+例如使用 DataStream API:
+```java
+MySQLSource.builder()
+ .startupOptions(StartupOptions.earliest()) // 从最早位点启动
+ .startupOptions(StartupOptions.latest()) // 从最晚位点启动
+ .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) //
从指定 binlog 文件名和位置启动
+
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"))
// 从 GTID 集合启动
+ .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
+ ...
+ .build()
+```
+
+使用 SQL:
+
+```SQL
+CREATE TABLE mysql_source (...) WITH (
+ 'connector' = 'mysql-cdc',
+ 'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
+ 'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
+ 'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
+ 'scan.startup.mode' = 'timestamp', -- 从特定位点启动
+ 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定
binlog 文件名
+ 'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
+ 'scan.startup.specific-offset.gtid-set' =
'24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
+ 'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
+ ...
+)
+```
+
+:::caution
+
+- MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on
checkpoint {checkpoint-id}"。
+ 该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
+- 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium
读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。
+
+:::
\ No newline at end of file