This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch new-official-website
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git
The following commit(s) were added to refs/heads/new-official-website by this
push:
new 09fec4e4b add new connect doc (#349)
09fec4e4b is described below
commit 09fec4e4bd4e10c625812ab5646f55518d179228
Author: zhoubo <[email protected]>
AuthorDate: Thu Oct 20 17:23:25 2022 +0800
add new connect doc (#349)
---
.../28RocketMQ Connect In Action1.md" | 11 +-
.../29RocketMQ Connect In Action2.md" | 9 +-
.../30RocketMQ Connect In Action3.md" | 158 +++++++++++++++------
.../RocketMQ-Connect-Integration-Demo.jpg | Bin 0 -> 342502 bytes
.../25RocketMQ Connect Overview.md" | 8 +-
.../26RocketMQ Connect Concept.md" | 12 +-
.../27RocketMQ Connect Quick Start.md" | 2 +-
.../28RocketMQ Connect In Action1.md" | 13 +-
.../29RocketMQ Connect In Action2.md" | 11 +-
.../30RocketMQ Connect In Action3.md" | 158 +++++++++++++++------
.../RocketMQ-Connect-Integration-Demo.jpg | Bin 0 -> 342502 bytes
.../25RocketMQ Connect Overview.md" | 8 +-
.../26RocketMQ Connect Concept.md" | 12 +-
.../27RocketMQ Connect Quick Start.md" | 2 +-
.../28RocketMQ Connect In Action1.md" | 13 +-
.../29RocketMQ Connect In Action2.md" | 11 +-
.../30RocketMQ Connect In Action3.md" | 158 +++++++++++++++------
.../RocketMQ-Connect-Integration-Demo.jpg | Bin 0 -> 342502 bytes
.../25RocketMQ Connect Overview.md" | 6 +-
.../26RocketMQ Connect Concept.md" | 4 +-
.../28RocketMQ Connect In Action1.md" | 11 +-
.../29RocketMQ Connect In Action2.md" | 9 +-
.../30RocketMQ Connect In Action3.md" | 158 +++++++++++++++------
.../RocketMQ-Connect-Integration-Demo.jpg | Bin 0 -> 342502 bytes
24 files changed, 523 insertions(+), 251 deletions(-)
diff --git
"a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect
In Action1.md"
"b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect
In Action1.md"
index fbd1ba415..b0ed8e0ae 100644
--- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
+++ "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
@@ -95,7 +95,7 @@ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
MySQL信息
@@ -142,9 +142,6 @@ INSERT INTO `employee` VALUES (14, 'name-08', 25, 15,
'company', 32255, '2022-02
INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14
20:13:29', NULL);
-
-
-use
```
目标库:inventory_2.employee
@@ -173,10 +170,10 @@ PRIMARY KEY (`id`)
作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl-X POST-H"Content-Type: application/json"http:
//127.0.0.1:8082/connectors/MySQLCDCSource'{
-"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
"max.task": "1",
-"connect.topicname": "debezium-MySQL-source-topic",
+"connect.topicname": "debezium-mysql-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
diff --git
"a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect
In Action2.md"
"b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect
In Action2.md"
index 5abc1c2bf..4f3ae9848 100644
--- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++ "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
@@ -27,11 +27,11 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
@@ -107,14 +107,15 @@ Postgres信息
端口:5432
账号:start_data_engineer/password
同步的源数据库:bank.holding
-目标库:bank1.holding1
+目标库:bank1.holding
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ Connect
In Action3.md"
similarity index 52%
copy from
"i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
copy to "docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
index 6404152c6..de816379b 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++ "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
@@ -1,6 +1,6 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战3
-PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
+
## 准备
@@ -27,14 +27,17 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium MySQL PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
+cp
rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
```
JDBC Connector
@@ -106,61 +109,94 @@ docker exec -ti postgres /bin/bash
Postgres信息
端口:5432
账号:start_data_engineer/password
-同步的源数据库:bank.holding
-目标库:bank1.holding1
+同步的源数据库:bank.user
+
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
账号:root/debezium
+同步的源数据库:bank.user
+
+目标库:bank1.user
### 测试数据
-通过start_data_engineer/password账号登录数据库
+通过root/debezium账号登录数据库
+源数据库表:bank.user
-源数据库表:bank.holding
+```
+create database bank;
+use bank;
+
+create table bank.user
+(
+ id bigint NOT NULL AUTO_INCREMENT,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height decimal(11, 2) null,
+ PRIMARY KEY (`id`)
+);
+
+insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1003 where id = 1003;
+
+```
+
+
+通过start_data_engineer/password账号登录PostgreSQL数据库
+
+源数据库表:bank.user
```
CREATE SCHEMA bank;
SET search_path TO bank,public;
-CREATE TABLE bank.holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created timestamp,
- datetime_updated timestamp,
- primary key(holding_id)
+create table bank.user
+(
+ id integer not null
+ constraint user_pkey
+ primary key,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height numeric(11, 2)
);
-ALTER TABLE bank.holding replica identity FULL;
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-\q
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
-insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
-update bank.holding set holding_quantity = 300 where holding_id=1000;
+
+insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1001 where id = 1001;
```
-目标表:bank1.holding
+目标表:bank1.user
```
create database bank1;
-CREATE TABLE holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created bigint,
- datetime_updated bigint,
- primary key(holding_id)
+create table bank1.user
+(
+ id bigint auto_increment
+ primary key,
+ user_id int null,
+ name varchar(8) null,
+ age int null,
+ birthday date null,
+ datetime_created timestamp(3) null,
+ datetime_updated timestamp(3) null,
+ height decimal(11, 2) null
);
```
@@ -169,14 +205,47 @@ CREATE TABLE holding (
### 启动Debezium source connector
-同步原表数据:bank.holding
+同步原表数据:bank.user
+作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
+"max.task": "1",
+"connect.topicname": "debezium-source-topic1000",
+"kafka.transforms": "Unwrap",
+"kafka.transforms.Unwrap.delete.handling.mode": "none",
+"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+"database.history.skip.unparseable.ddl": true,
+"database.history.name.srv.addr": "localhost:9876",
+"database.history.rocketmq.topic": "db-history-debezium-topic1000",
+"database.history.store.only.monitored.tables.ddl": true,
+"include.schema.changes": false,
+"database.server.name": "dbserver1",
+"database.port": 3306,
+"database.hostname": "数据库ip",
+"database.connectionTimeZone": "UTC",
+"database.user": "debezium",
+"database.password": "dbz",
+"table.include.list": "bank.user",
+"max.batch.size": 50,
+"database.include.list": "bank",
+"snapshot.mode": "when_needed",
+"database.server.id": "184054",
+"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+```
+
+同步原表数据:bank.user
作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
- "connect.topicname": "debezium-postgres-source-01",
+ "connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
@@ -189,7 +258,7 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
- "table.whitelist": "bank.holding",
+ "table.whitelist": "bank.user",
"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
@@ -200,14 +269,14 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
- "connect.topicnames": "debezium-postgres-source-01",
+ "connect.topicnames": "debezium-source-topic1000",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
- "pk.fields": "holding_id",
+ "pk.fields": "id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
@@ -223,11 +292,12 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
```
-以上两个Connector任务创建成功以后
-通过start_data_engineer/password账号登录数据库
+以上三个Connector任务创建成功以后
+通过start_data_engineer/password账号登录PostgreSQL数据库
账号登录数据库
+或者通过root/debezium账号登录MySQL数据库
-对源数据库表:bankholding增删改
-即可同步到目标表bank1.holding
+对源数据库表:bank.user增删改
+都会同步到同步到目标表MySQL bank1.user
diff --git
a/docs/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
b/docs/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
new file mode 100644
index 000000000..06e944994
Binary files /dev/null and
b/docs/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg differ
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
index 03d4216d9..4021e99dd 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
@@ -1,7 +1,7 @@
-# RocketMQ Connect Overview
+# RocketMQ Connect 概览
RocketMQ
Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统,
-它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。
+它具备低延时,高可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。

@@ -12,13 +12,13 @@ RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,
### Connector的使用场景
-#####构建流式数据管道
+##### 构建流式数据管道

在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ
Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。
-#####CDC
+##### CDC
CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ
Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
index 53c852585..501455350 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
@@ -1,4 +1,4 @@
-# Concept
+# 概念
## Connector
@@ -9,20 +9,20 @@
是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。
-
+
通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。
通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。
-
+
## Worker
-worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
-除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。
+worker 进程是Connector和Task运行环境,它提供RESTFul能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
+除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负载均衡能力实现的。
-
+
从上面面这张图,看到Worker通过提供的REST
Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
index 1db8ccaeb..31c958953 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
@@ -1,4 +1,4 @@
-# Quick Start
+# 快速开始
[](https://www.apache.org/licenses/LICENSE-2.0.html)
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
index 43de0359e..b0ed8e0ae 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
@@ -1,4 +1,4 @@
-# RocketMQ Connect In Action1
+# RocketMQ Connect实战1
MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
@@ -95,7 +95,7 @@ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
MySQL信息
@@ -142,9 +142,6 @@ INSERT INTO `employee` VALUES (14, 'name-08', 25, 15,
'company', 32255, '2022-02
INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14
20:13:29', NULL);
-
-
-use
```
目标库:inventory_2.employee
@@ -173,10 +170,10 @@ PRIMARY KEY (`id`)
作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl-X POST-H"Content-Type: application/json"http:
//127.0.0.1:8082/connectors/MySQLCDCSource'{
-"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
"max.task": "1",
-"connect.topicname": "debezium-MySQL-source-topic",
+"connect.topicname": "debezium-mysql-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
index 6404152c6..4f3ae9848 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
@@ -1,4 +1,4 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战2
PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
@@ -27,11 +27,11 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
@@ -107,14 +107,15 @@ Postgres信息
端口:5432
账号:start_data_engineer/password
同步的源数据库:bank.holding
-目标库:bank1.holding1
+目标库:bank1.holding
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
similarity index 52%
copy from
"i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
copy to
"i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
index 6404152c6..de816379b 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
@@ -1,6 +1,6 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战3
-PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
+
## 准备
@@ -27,14 +27,17 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium MySQL PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
+cp
rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
```
JDBC Connector
@@ -106,61 +109,94 @@ docker exec -ti postgres /bin/bash
Postgres信息
端口:5432
账号:start_data_engineer/password
-同步的源数据库:bank.holding
-目标库:bank1.holding1
+同步的源数据库:bank.user
+
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
账号:root/debezium
+同步的源数据库:bank.user
+
+目标库:bank1.user
### 测试数据
-通过start_data_engineer/password账号登录数据库
+通过root/debezium账号登录数据库
+源数据库表:bank.user
-源数据库表:bank.holding
+```
+create database bank;
+use bank;
+
+create table bank.user
+(
+ id bigint NOT NULL AUTO_INCREMENT,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height decimal(11, 2) null,
+ PRIMARY KEY (`id`)
+);
+
+insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1003 where id = 1003;
+
+```
+
+
+通过start_data_engineer/password账号登录PostgreSQL数据库
+
+源数据库表:bank.user
```
CREATE SCHEMA bank;
SET search_path TO bank,public;
-CREATE TABLE bank.holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created timestamp,
- datetime_updated timestamp,
- primary key(holding_id)
+create table bank.user
+(
+ id integer not null
+ constraint user_pkey
+ primary key,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height numeric(11, 2)
);
-ALTER TABLE bank.holding replica identity FULL;
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-\q
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
-insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
-update bank.holding set holding_quantity = 300 where holding_id=1000;
+
+insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1001 where id = 1001;
```
-目标表:bank1.holding
+目标表:bank1.user
```
create database bank1;
-CREATE TABLE holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created bigint,
- datetime_updated bigint,
- primary key(holding_id)
+create table bank1.user
+(
+ id bigint auto_increment
+ primary key,
+ user_id int null,
+ name varchar(8) null,
+ age int null,
+ birthday date null,
+ datetime_created timestamp(3) null,
+ datetime_updated timestamp(3) null,
+ height decimal(11, 2) null
);
```
@@ -169,14 +205,47 @@ CREATE TABLE holding (
### 启动Debezium source connector
-同步原表数据:bank.holding
+同步原表数据:bank.user
+作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
+"max.task": "1",
+"connect.topicname": "debezium-source-topic1000",
+"kafka.transforms": "Unwrap",
+"kafka.transforms.Unwrap.delete.handling.mode": "none",
+"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+"database.history.skip.unparseable.ddl": true,
+"database.history.name.srv.addr": "localhost:9876",
+"database.history.rocketmq.topic": "db-history-debezium-topic1000",
+"database.history.store.only.monitored.tables.ddl": true,
+"include.schema.changes": false,
+"database.server.name": "dbserver1",
+"database.port": 3306,
+"database.hostname": "数据库ip",
+"database.connectionTimeZone": "UTC",
+"database.user": "debezium",
+"database.password": "dbz",
+"table.include.list": "bank.user",
+"max.batch.size": 50,
+"database.include.list": "bank",
+"snapshot.mode": "when_needed",
+"database.server.id": "184054",
+"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+```
+
+同步原表数据:bank.user
作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
- "connect.topicname": "debezium-postgres-source-01",
+ "connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
@@ -189,7 +258,7 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
- "table.whitelist": "bank.holding",
+ "table.whitelist": "bank.user",
"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
@@ -200,14 +269,14 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
- "connect.topicnames": "debezium-postgres-source-01",
+ "connect.topicnames": "debezium-source-topic1000",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
- "pk.fields": "holding_id",
+ "pk.fields": "id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
@@ -223,11 +292,12 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
```
-以上两个Connector任务创建成功以后
-通过start_data_engineer/password账号登录数据库
+以上三个Connector任务创建成功以后
+通过start_data_engineer/password账号登录PostgreSQL数据库
账号登录数据库
+或者通过root/debezium账号登录MySQL数据库
-对源数据库表:bankholding增删改
-即可同步到目标表bank1.holding
+对源数据库表:bank.user增删改
+都会同步到同步到目标表MySQL bank1.user
diff --git
a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
new file mode 100644
index 000000000..06e944994
Binary files /dev/null and
b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
differ
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
index 03d4216d9..4021e99dd 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
@@ -1,7 +1,7 @@
-# RocketMQ Connect Overview
+# RocketMQ Connect 概览
RocketMQ
Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统,
-它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。
+它具备低延时,高可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。

@@ -12,13 +12,13 @@ RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,
### Connector的使用场景
-#####构建流式数据管道
+##### 构建流式数据管道

在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ
Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。
-#####CDC
+##### CDC
CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ
Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
index 53c852585..501455350 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
@@ -1,4 +1,4 @@
-# Concept
+# 概念
## Connector
@@ -9,20 +9,20 @@
是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。
-
+
通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。
通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。
-
+
## Worker
-worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
-除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。
+worker 进程是Connector和Task运行环境,它提供RESTFul能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
+除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负载均衡能力实现的。
-
+
从上面面这张图,看到Worker通过提供的REST
Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
index 1db8ccaeb..31c958953 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ
Connect Quick Start.md"
@@ -1,4 +1,4 @@
-# Quick Start
+# 快速开始
[](https://www.apache.org/licenses/LICENSE-2.0.html)
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
index 43de0359e..b0ed8e0ae 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
@@ -1,4 +1,4 @@
-# RocketMQ Connect In Action1
+# RocketMQ Connect实战1
MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
@@ -95,7 +95,7 @@ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
MySQL信息
@@ -142,9 +142,6 @@ INSERT INTO `employee` VALUES (14, 'name-08', 25, 15,
'company', 32255, '2022-02
INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14
20:13:29', NULL);
-
-
-use
```
目标库:inventory_2.employee
@@ -173,10 +170,10 @@ PRIMARY KEY (`id`)
作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl-X POST-H"Content-Type: application/json"http:
//127.0.0.1:8082/connectors/MySQLCDCSource'{
-"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
"max.task": "1",
-"connect.topicname": "debezium-MySQL-source-topic",
+"connect.topicname": "debezium-mysql-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
index 6404152c6..4f3ae9848 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
@@ -1,4 +1,4 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战2
PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
@@ -27,11 +27,11 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
@@ -107,14 +107,15 @@ Postgres信息
端口:5432
账号:start_data_engineer/password
同步的源数据库:bank.holding
-目标库:bank1.holding1
+目标库:bank1.holding
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
similarity index 52%
copy from
"i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
copy to
"i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
index 6404152c6..de816379b 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
@@ -1,6 +1,6 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战3
-PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
+
## 准备
@@ -27,14 +27,17 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium MySQL PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
+cp
rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
```
JDBC Connector
@@ -106,61 +109,94 @@ docker exec -ti postgres /bin/bash
Postgres信息
端口:5432
账号:start_data_engineer/password
-同步的源数据库:bank.holding
-目标库:bank1.holding1
+同步的源数据库:bank.user
+
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
账号:root/debezium
+同步的源数据库:bank.user
+
+目标库:bank1.user
### 测试数据
-通过start_data_engineer/password账号登录数据库
+通过root/debezium账号登录数据库
+源数据库表:bank.user
-源数据库表:bank.holding
+```
+create database bank;
+use bank;
+
+create table bank.user
+(
+ id bigint NOT NULL AUTO_INCREMENT,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height decimal(11, 2) null,
+ PRIMARY KEY (`id`)
+);
+
+insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1003 where id = 1003;
+
+```
+
+
+通过start_data_engineer/password账号登录PostgreSQL数据库
+
+源数据库表:bank.user
```
CREATE SCHEMA bank;
SET search_path TO bank,public;
-CREATE TABLE bank.holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created timestamp,
- datetime_updated timestamp,
- primary key(holding_id)
+create table bank.user
+(
+ id integer not null
+ constraint user_pkey
+ primary key,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height numeric(11, 2)
);
-ALTER TABLE bank.holding replica identity FULL;
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-\q
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
-insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
-update bank.holding set holding_quantity = 300 where holding_id=1000;
+
+insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1001 where id = 1001;
```
-目标表:bank1.holding
+目标表:bank1.user
```
create database bank1;
-CREATE TABLE holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created bigint,
- datetime_updated bigint,
- primary key(holding_id)
+create table bank1.user
+(
+ id bigint auto_increment
+ primary key,
+ user_id int null,
+ name varchar(8) null,
+ age int null,
+ birthday date null,
+ datetime_created timestamp(3) null,
+ datetime_updated timestamp(3) null,
+ height decimal(11, 2) null
);
```
@@ -169,14 +205,47 @@ CREATE TABLE holding (
### 启动Debezium source connector
-同步原表数据:bank.holding
+同步原表数据:bank.user
+作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
+"max.task": "1",
+"connect.topicname": "debezium-source-topic1000",
+"kafka.transforms": "Unwrap",
+"kafka.transforms.Unwrap.delete.handling.mode": "none",
+"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+"database.history.skip.unparseable.ddl": true,
+"database.history.name.srv.addr": "localhost:9876",
+"database.history.rocketmq.topic": "db-history-debezium-topic1000",
+"database.history.store.only.monitored.tables.ddl": true,
+"include.schema.changes": false,
+"database.server.name": "dbserver1",
+"database.port": 3306,
+"database.hostname": "数据库ip",
+"database.connectionTimeZone": "UTC",
+"database.user": "debezium",
+"database.password": "dbz",
+"table.include.list": "bank.user",
+"max.batch.size": 50,
+"database.include.list": "bank",
+"snapshot.mode": "when_needed",
+"database.server.id": "184054",
+"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+```
+
+同步原表数据:bank.user
作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
- "connect.topicname": "debezium-postgres-source-01",
+ "connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
@@ -189,7 +258,7 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
- "table.whitelist": "bank.holding",
+ "table.whitelist": "bank.user",
"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
@@ -200,14 +269,14 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
- "connect.topicnames": "debezium-postgres-source-01",
+ "connect.topicnames": "debezium-source-topic1000",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
- "pk.fields": "holding_id",
+ "pk.fields": "id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
@@ -223,11 +292,12 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
```
-以上两个Connector任务创建成功以后
-通过start_data_engineer/password账号登录数据库
+以上三个Connector任务创建成功以后
+通过start_data_engineer/password账号登录PostgreSQL数据库
账号登录数据库
+或者通过root/debezium账号登录MySQL数据库
-对源数据库表:bankholding增删改
-即可同步到目标表bank1.holding
+对源数据库表:bank.user增删改
+都会同步到同步到目标表MySQL bank1.user
diff --git
a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
new file mode 100644
index 000000000..06e944994
Binary files /dev/null and
b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
differ
diff --git
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
index 4324b210a..4021e99dd 100644
---
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
+++
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ
Connect Overview.md"
@@ -1,7 +1,7 @@
# RocketMQ Connect 概览
RocketMQ
Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统,
-它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。
+它具备低延时,高可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。

@@ -12,13 +12,13 @@ RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,
### Connector的使用场景
-#####构建流式数据管道
+##### 构建流式数据管道

在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ
Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。
-#####CDC
+##### CDC
CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ
Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。
diff --git
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
index f0981b4c8..501455350 100644
---
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
+++
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ
Connect Concept.md"
@@ -20,8 +20,8 @@
## Worker
-worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
-除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。
+worker 进程是Connector和Task运行环境,它提供RESTFul能力,接受HTTP请求,将获取到的配置传递给Connector和Task。
+除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负载均衡能力实现的。

diff --git
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
index fbd1ba415..b0ed8e0ae 100644
---
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
+++
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ
Connect In Action1.md"
@@ -95,7 +95,7 @@ sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
MySQL信息
@@ -142,9 +142,6 @@ INSERT INTO `employee` VALUES (14, 'name-08', 25, 15,
'company', 32255, '2022-02
INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14
20:13:29', NULL);
-
-
-use
```
目标库:inventory_2.employee
@@ -173,10 +170,10 @@ PRIMARY KEY (`id`)
作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl-X POST-H"Content-Type: application/json"http:
//127.0.0.1:8082/connectors/MySQLCDCSource'{
-"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector",
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
"max.task": "1",
-"connect.topicname": "debezium-MySQL-source-topic",
+"connect.topicname": "debezium-mysql-source-topic",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
diff --git
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
index 5abc1c2bf..4f3ae9848 100644
---
"a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
@@ -27,11 +27,11 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
@@ -107,14 +107,15 @@ Postgres信息
端口:5432
账号:start_data_engineer/password
同步的源数据库:bank.holding
-目标库:bank1.holding1
+目标库:bank1.holding
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
diff --git
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
similarity index 52%
copy from
"i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
copy to
"versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
index 6404152c6..de816379b 100644
---
"a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ
Connect In Action2.md"
+++
"b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/30RocketMQ
Connect In Action3.md"
@@ -1,6 +1,6 @@
-# RocketMQ Connect In Action2
+# RocketMQ Connect实战3
-PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)
+
## 准备
@@ -27,14 +27,17 @@ PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL
Sink(JDBC)
Debezium RocketMQ Connector
```
-$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/
+$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true
```
-将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
+将 Debezium MySQL PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:
```
mkdir -p /usr/local/connector-plugins
cp
rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
+cp
rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
/usr/local/connector-plugins
+
```
JDBC Connector
@@ -106,61 +109,94 @@ docker exec -ti postgres /bin/bash
Postgres信息
端口:5432
账号:start_data_engineer/password
-同步的源数据库:bank.holding
-目标库:bank1.holding1
+同步的源数据库:bank.user
+
### MySQL镜像
使用debezium的MySQL docker搭建环境MySQL数据库
```
-docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium
-e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw
quay.io/debezium/example-MySQL:1.9
+docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium
-e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
quay.io/debezium/example-mysql:1.9
```
+
MySQL信息
端口:3306
账号:root/debezium
+同步的源数据库:bank.user
+
+目标库:bank1.user
### 测试数据
-通过start_data_engineer/password账号登录数据库
+通过root/debezium账号登录数据库
+源数据库表:bank.user
-源数据库表:bank.holding
+```
+create database bank;
+use bank;
+
+create table bank.user
+(
+ id bigint NOT NULL AUTO_INCREMENT,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height decimal(11, 2) null,
+ PRIMARY KEY (`id`)
+);
+
+insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1003 where id = 1003;
+
+```
+
+
+通过start_data_engineer/password账号登录PostgreSQL数据库
+
+源数据库表:bank.user
```
CREATE SCHEMA bank;
SET search_path TO bank,public;
-CREATE TABLE bank.holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created timestamp,
- datetime_updated timestamp,
- primary key(holding_id)
+create table bank.user
+(
+ id integer not null
+ constraint user_pkey
+ primary key,
+ user_id integer,
+ name varchar(8),
+ age integer,
+ birthday date,
+ datetime_created timestamp(3),
+ datetime_updated timestamp(3),
+ height numeric(11, 2)
);
-ALTER TABLE bank.holding replica identity FULL;
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-\q
-insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
-insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
-insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
-update bank.holding set holding_quantity = 300 where holding_id=1000;
+
+insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(),
1.72);
+update bank.user set user_id = 1001 where id = 1001;
```
-目标表:bank1.holding
+目标表:bank1.user
```
create database bank1;
-CREATE TABLE holding (
- holding_id int,
- user_id int,
- holding_stock varchar(8),
- holding_quantity int,
- datetime_created bigint,
- datetime_updated bigint,
- primary key(holding_id)
+create table bank1.user
+(
+ id bigint auto_increment
+ primary key,
+ user_id int null,
+ name varchar(8) null,
+ age int null,
+ birthday date null,
+ datetime_created timestamp(3) null,
+ datetime_updated timestamp(3) null,
+ height decimal(11, 2) null
);
```
@@ -169,14 +205,47 @@ CREATE TABLE holding (
### 启动Debezium source connector
-同步原表数据:bank.holding
+同步原表数据:bank.user
+作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
+
+```
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
+"connector.class":
"org.apache.rocketmq.connect.debezium.MySQL.DebeziumMysqlConnector",
+"max.task": "1",
+"connect.topicname": "debezium-source-topic1000",
+"kafka.transforms": "Unwrap",
+"kafka.transforms.Unwrap.delete.handling.mode": "none",
+"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+"database.history.skip.unparseable.ddl": true,
+"database.history.name.srv.addr": "localhost:9876",
+"database.history.rocketmq.topic": "db-history-debezium-topic1000",
+"database.history.store.only.monitored.tables.ddl": true,
+"include.schema.changes": false,
+"database.server.name": "dbserver1",
+"database.port": 3306,
+"database.hostname": "数据库ip",
+"database.connectionTimeZone": "UTC",
+"database.user": "debezium",
+"database.password": "dbz",
+"table.include.list": "bank.user",
+"max.batch.size": 50,
+"database.include.list": "bank",
+"snapshot.mode": "when_needed",
+"database.server.id": "184054",
+"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+```
+
+同步原表数据:bank.user
作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
- "connect.topicname": "debezium-postgres-source-01",
+ "connect.topicname": "debezium-source-topic1000",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
@@ -189,7 +258,7 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
- "table.whitelist": "bank.holding",
+ "table.whitelist": "bank.user",
"key.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter":
"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
@@ -200,14 +269,14 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中
```
-curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
"connector.class":
"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
- "connect.topicnames": "debezium-postgres-source-01",
+ "connect.topicnames": "debezium-source-topic1000",
"connection.url": "jdbc:mysql://数据库ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
- "pk.fields": "holding_id",
+ "pk.fields": "id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
@@ -223,11 +292,12 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
```
-以上两个Connector任务创建成功以后
-通过start_data_engineer/password账号登录数据库
+以上三个Connector任务创建成功以后
+通过start_data_engineer/password账号登录PostgreSQL数据库
账号登录数据库
+或者通过root/debezium账号登录MySQL数据库
-对源数据库表:bankholding增删改
-即可同步到目标表bank1.holding
+对源数据库表:bank.user增删改
+都会同步到同步到目标表MySQL bank1.user
diff --git
a/versioned_docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
b/versioned_docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
new file mode 100644
index 000000000..06e944994
Binary files /dev/null and
b/versioned_docs/version-5.0/picture/32rocketmq-connect/RocketMQ-Connect-Integration-Demo.jpg
differ