This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch revert-267-add_rocketmq_connect_doc in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git
commit 75b7d2d155706b6056e5ac6d267aa0768be45ee7 Author: Heng Du <[email protected]> AuthorDate: Thu Sep 29 11:19:07 2022 +0800 Revert "[ISSUE #266] add rocketmq connect doc (#267)" This reverts commit 016c38cb35db112ed62dd1dace61005d3c7e6ff1. --- .../25RocketMQ Connect Overview.md" | 42 ---- .../26RocketMQ Connect Concept.md" | 28 --- .../27RocketMQ Connect Quick Start.md" | 182 ---------------- .../28RocketMQ Connect In Action1.md" | 239 --------------------- .../29RocketMQ Connect In Action2.md" | 233 -------------------- .../32rocketmq-connect/Connector-Task-Concept.png | Bin 39593 -> 0 bytes .../32rocketmq-connect/Connector-Task-process.png | Bin 74579 -> 0 bytes docs/picture/32rocketmq-connect/deploy1.png | Bin 7300 -> 0 bytes docs/picture/32rocketmq-connect/deploy2.png | Bin 7838 -> 0 bytes docs/picture/32rocketmq-connect/deploy3.png | Bin 26127 -> 0 bytes docs/picture/32rocketmq-connect/deploy4.png | Bin 9377 -> 0 bytes docs/picture/32rocketmq-connect/overview.png | Bin 38735 -> 0 bytes docs/picture/32rocketmq-connect/scene.png | Bin 14617 -> 0 bytes docs/picture/32rocketmq-connect/worker.png | Bin 17049 -> 0 bytes .../en/docusaurus-plugin-content-docs/current.json | 4 - .../25RocketMQ Connect Overview.md" | 42 ---- .../26RocketMQ Connect Concept.md" | 28 --- .../27RocketMQ Connect Quick Start.md" | 182 ---------------- .../28RocketMQ Connect In Action1.md" | 239 --------------------- .../29RocketMQ Connect In Action2.md" | 233 -------------------- .../32rocketmq-connect/Connector-Task-Concept.png | Bin 39593 -> 0 bytes .../32rocketmq-connect/Connector-Task-process.png | Bin 74579 -> 0 bytes .../current/picture/32rocketmq-connect/deploy1.png | Bin 7300 -> 0 bytes .../current/picture/32rocketmq-connect/deploy2.png | Bin 7838 -> 0 bytes .../current/picture/32rocketmq-connect/deploy3.png | Bin 26127 -> 0 bytes .../current/picture/32rocketmq-connect/deploy4.png | Bin 9377 -> 0 bytes .../picture/32rocketmq-connect/overview.png | Bin 38735 -> 0 bytes .../current/picture/32rocketmq-connect/scene.png | Bin 14617 -> 0 bytes .../current/picture/32rocketmq-connect/worker.png | Bin 17049 -> 0 bytes .../version-5.0.json | 4 - .../25RocketMQ Connect Overview.md" | 42 ---- .../26RocketMQ Connect Concept.md" | 28 --- .../27RocketMQ Connect Quick Start.md" | 182 ---------------- .../28RocketMQ Connect In Action1.md" | 239 --------------------- .../29RocketMQ Connect In Action2.md" | 233 -------------------- .../32rocketmq-connect/Connector-Task-Concept.png | Bin 39593 -> 0 bytes .../32rocketmq-connect/Connector-Task-process.png | Bin 74579 -> 0 bytes .../picture/32rocketmq-connect/deploy1.png | Bin 7300 -> 0 bytes .../picture/32rocketmq-connect/deploy2.png | Bin 7838 -> 0 bytes .../picture/32rocketmq-connect/deploy3.png | Bin 26127 -> 0 bytes .../picture/32rocketmq-connect/deploy4.png | Bin 9377 -> 0 bytes .../picture/32rocketmq-connect/overview.png | Bin 38735 -> 0 bytes .../picture/32rocketmq-connect/scene.png | Bin 14617 -> 0 bytes .../picture/32rocketmq-connect/worker.png | Bin 17049 -> 0 bytes .../25RocketMQ Connect Overview.md" | 42 ---- .../26RocketMQ Connect Concept.md" | 28 --- .../27RocketMQ Connect Quick Start.md" | 182 ---------------- .../28RocketMQ Connect In Action1.md" | 239 --------------------- .../29RocketMQ Connect In Action2.md" | 233 -------------------- .../32rocketmq-connect/Connector-Task-Concept.png | Bin 39593 -> 0 bytes .../32rocketmq-connect/Connector-Task-process.png | Bin 74579 -> 0 bytes .../picture/32rocketmq-connect/deploy1.png | Bin 7300 -> 0 bytes .../picture/32rocketmq-connect/deploy2.png | Bin 7838 -> 0 bytes .../picture/32rocketmq-connect/deploy3.png | Bin 26127 -> 0 bytes .../picture/32rocketmq-connect/deploy4.png | Bin 9377 -> 0 bytes .../picture/32rocketmq-connect/overview.png | Bin 38735 -> 0 bytes .../picture/32rocketmq-connect/scene.png | Bin 14617 -> 0 bytes .../picture/32rocketmq-connect/worker.png | Bin 17049 -> 0 bytes 58 files changed, 2904 deletions(-) diff --git "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" deleted file mode 100644 index 4324b210..00000000 --- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" +++ /dev/null @@ -1,42 +0,0 @@ -# RocketMQ Connect 概览 - -RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, -它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。 - - - - -### Connector工作原理 - -RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。 - -### Connector的使用场景 - -#####构建流式数据管道 - - - -在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。 - -#####CDC - -CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。 - -### Connector 部署 - -在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。 - - - -一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。 - - - -RocketMQ Connect Worker支持两种运行模式,集群和单机 -集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。 - - - -单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。 - - diff --git "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" deleted file mode 100644 index f0981b4c..00000000 --- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" +++ /dev/null @@ -1,28 +0,0 @@ -# 概念 - -## Connector - -连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector -,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。 - -## Task - -是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。 - - - -通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。 - -通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。 - - - - -## Worker - -worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 -除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。 - - - -从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。 diff --git "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" deleted file mode 100644 index 31c95895..00000000 --- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" +++ /dev/null @@ -1,182 +0,0 @@ -# 快速开始 - -[](https://www.apache.org/licenses/LICENSE-2.0.html) - -# 快速开始 - -单机模式下[rocketmq-connect-sample]作为 demo - -rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件 - -## 1.准备 - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. 创建测试Topic -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -## 2.构建Connect - -``` -git clone https://github.com/apache/rocketmq-connect.git - -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -## 3.运行Worker - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` -**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration - ->JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" - -runtime启动成功: - ->The standalone worker boot success. - -查看启动日志文件: - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -ctrl + c 退出日志 - -## 4.启动source connector - -当前目录创建测试文件 test-source-file.txt -``` -touch test-source-file.txt - -echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt - -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' -``` - -看到以下日志说明 file source connector 启动成功了 - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... - -#### source connector配置说明 - -| key | nullable | default | description | -|-------------------| -------- | ---------------------|--------------------------| -| connector.class | false | | 实现 Connector接口的类名称(包含包名) | -| filename | false | | 数据源文件名称 | -| connect.topicname | false | | 同步文件数据所需topic | - - -## 5.启动sink connector - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' - -cat test-sink-file.txt -``` - - -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -看到以下日志说明file sink connector 启动成功了 - -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... - -如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。 -文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。 - -#### sink connector配置说明 - -| key | nullable | default | description | -|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- | -| connector.class | false | | 实现Connector接口的类名称(包含包名) | -| filename | false | | sink拉去的数据保存到文件 | -| connect.topicnames | false | | sink需要处理数据消息topics | - -``` -注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准 -``` - -## 6.停止connector - -```shell -GET请求 -http://(your worker ip):(port)/connectors/(connector name)/stop - -停止demo中的两个connector -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - -``` -看到以下日志说明connector停止成功了 - ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7.停止Worker进程 - -``` -sh bin/connectshutdown.sh -``` - -## 8.日志目录 - ->${user.home}/logs/rocketmqconnect - -## 9.配置文件 - -持久化配置文件默认目录 /tmp/storeRoot - -| key | description | -|----------------------|---------------------------| -| connectorConfig.json | connector配置持久化文件 | -| position.json | source connect数据处理进度持久化文件 | -| taskConfig.json | task配置持久化文件 | -| offset.json | sink connect数据消费进度持久化文件 | -| connectorStatus.json | connector 状态持久化文件 | -| taskStatus.json | task 状态持久化文件 | - -## 10.配置说明 - -可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息 - -文件位置:work 启动目录下 conf/connect-standalone.conf - -```shell -#current cluster node uniquely identifies -workerId=DEFAULT_WORKER_1 - -# Http prot for user to access REST API -httpPort=8082 - -# Local file dir for config store -storePathRootDir=/home/connect/storeRoot - -#需要修改为自己的rocketmq nameserver 接入点 -# RocketMQ namesrvAddr -namesrvAddr=127.0.0.1:9876 - -#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件, -支持文件和目录 -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar - -# 补充:将 Connector 相关实现插件保存到指定文件夹 -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" deleted file mode 100644 index fbd1ba41..00000000 --- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" +++ /dev/null @@ -1,239 +0,0 @@ -# RocketMQ Connect实战1 - -MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### MySQL镜像 -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - -slave:debezium/dbz - - -### 测试数据 - -通过root/debezium账号登录数据库 - -源数据库表:inventory.employee - -``` -CREATE database inventory; - -use inventory; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; - - - -INSERT INTO `employee` VALUES (1, 'name-01', 24, 6, 'company', 9987, '2021-12-22 08:00:00', '2022-06-14 18:20:11', 321.11); -INSERT INTO `employee` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00', '2022-06-14 18:18:47', 77.12); -INSERT INTO `employee` VALUES (8, 'name-03', 20, 1, NULL, 0, NULL, '2022-06-14 18:26:05', 11111.00); -INSERT INTO `employee` VALUES (9, 'name-04', 21, 1, 'company', 12345, '2021-12-24 20:44:10', '2022-06-14 18:20:02', 123.12); -INSERT INTO `employee` VALUES (11, 'name-05', 50, 2, 'company', 33333, '2021-12-24 22:14:52', '2022-06-14 18:19:58', 123.12); -INSERT INTO `employee` VALUES (12, 'name-06', 19, 3, NULL, 0, NULL, '2022-06-14 18:26:12', 111233.00); -INSERT INTO `employee` VALUES (13, 'name-07', 20, 4, 'company', 3237, '2021-12-29 01:31:03', '2022-06-14 18:19:27', 52.00); -INSERT INTO `employee` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39', '2022-06-14 18:18:32', 0.00); -INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14 20:13:29', NULL); - - - - -use -``` - -目标库:inventory_2.employee -``` -CREATE database inventory_2; -use inventory_2; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:inventory.employee -作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{ -"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector", -"max.task": "1", -"connect.topicname": "debezium-MySQL-source-topic", -"kafka.transforms": "Unwrap", -"kafka.transforms.Unwrap.delete.handling.mode": "none", -"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", -"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", -"database.history.skip.unparseable.ddl": true, -"database.history.name.srv.addr": "localhost:9876", -"database.history.rocketmq.topic": "db-history-debezium-topic", -"database.history.store.only.monitored.tables.ddl": true, -"include.schema.changes": false, -"database.server.name": "dbserver1", -"database.port": 3306, -"database.hostname": "数据库ip", -"database.connectionTimeZone": "UTC", -"database.user": "debezium", -"database.password": "dbz", -"table.include.list": "inventory.employee", -"max.batch.size": 50, -"database.include.list": "inventory", -"snapshot.mode": "when_needed", -"database.server.id": "184054", -"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", -"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-mysql-source", - "connection.url": "jdbc:mysql://数据库ip:3306/inventory_2", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - - -以上两个Connector任务创建成功以后 -通过root/debezium账号登录数据库 - -对源数据库表:inventory.employee增删改 -即可同步到目标办inventory_2.employee - - diff --git "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" "b/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" deleted file mode 100644 index 5abc1c2b..00000000 --- "a/docs/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" +++ /dev/null @@ -1,233 +0,0 @@ -# RocketMQ Connect实战2 - -PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime - -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### Postgres镜像 - -使用debezium的Postgres docker搭建环境MySQL数据库 -``` -# starting a pg instance -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14 - -# bash into postgres instance -docker exec -ti postgres /bin/bash -``` -Postgres信息 -端口:5432 -账号:start_data_engineer/password -同步的源数据库:bank.holding -目标库:bank1.holding1 - -### MySQL镜像 - -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - - -### 测试数据 - -通过start_data_engineer/password账号登录数据库 - -源数据库表:bank.holding - -``` -CREATE SCHEMA bank; -SET search_path TO bank,public; -CREATE TABLE bank.holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created timestamp, - datetime_updated timestamp, - primary key(holding_id) -); -ALTER TABLE bank.holding replica identity FULL; -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -\q -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -insert into bank.holding values (1001, 2, 'SP500', 1, now(), now()); -insert into bank.holding values (1003, 3, 'SP500', 1, now(), now()); -update bank.holding set holding_quantity = 300 where holding_id=1000; - -``` - -目标表:bank1.holding -``` -create database bank1; -CREATE TABLE holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created bigint, - datetime_updated bigint, - primary key(holding_id) -); - -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:bank.holding -作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{ - "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector", - "max.task": "1", - "connect.topicname": "debezium-postgres-source-01", - "kafka.transforms": "Unwrap", - "kafka.transforms.Unwrap.delete.handling.mode": "none", - "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", - "database.history.skip.unparseable.ddl": true, - "database.server.name": "bankserver1", - "database.port": 5432, - "database.hostname": "数据库ip", - "database.connectionTimeZone": "UTC", - "database.user": "start_data_engineer", - "database.dbname": "start_data_engineer", - "database.password": "password", - "table.whitelist": "bank.holding", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-postgres-source-01", - "connection.url": "jdbc:mysql://数据库ip:3306/bank1", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "holding_id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' - -``` - -以上两个Connector任务创建成功以后 -通过start_data_engineer/password账号登录数据库 -账号登录数据库 - -对源数据库表:bankholding增删改 -即可同步到目标表bank1.holding - - diff --git a/docs/picture/32rocketmq-connect/Connector-Task-Concept.png b/docs/picture/32rocketmq-connect/Connector-Task-Concept.png deleted file mode 100644 index b500ded2..00000000 Binary files a/docs/picture/32rocketmq-connect/Connector-Task-Concept.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/Connector-Task-process.png b/docs/picture/32rocketmq-connect/Connector-Task-process.png deleted file mode 100644 index 69c08cdf..00000000 Binary files a/docs/picture/32rocketmq-connect/Connector-Task-process.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/deploy1.png b/docs/picture/32rocketmq-connect/deploy1.png deleted file mode 100644 index 3d7de5ba..00000000 Binary files a/docs/picture/32rocketmq-connect/deploy1.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/deploy2.png b/docs/picture/32rocketmq-connect/deploy2.png deleted file mode 100644 index 79039822..00000000 Binary files a/docs/picture/32rocketmq-connect/deploy2.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/deploy3.png b/docs/picture/32rocketmq-connect/deploy3.png deleted file mode 100644 index 185ef93b..00000000 Binary files a/docs/picture/32rocketmq-connect/deploy3.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/deploy4.png b/docs/picture/32rocketmq-connect/deploy4.png deleted file mode 100644 index 0fbe3254..00000000 Binary files a/docs/picture/32rocketmq-connect/deploy4.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/overview.png b/docs/picture/32rocketmq-connect/overview.png deleted file mode 100644 index 22b6142d..00000000 Binary files a/docs/picture/32rocketmq-connect/overview.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/scene.png b/docs/picture/32rocketmq-connect/scene.png deleted file mode 100644 index 90d05a6e..00000000 Binary files a/docs/picture/32rocketmq-connect/scene.png and /dev/null differ diff --git a/docs/picture/32rocketmq-connect/worker.png b/docs/picture/32rocketmq-connect/worker.png deleted file mode 100644 index ad46d38a..00000000 Binary files a/docs/picture/32rocketmq-connect/worker.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current.json b/i18n/en/docusaurus-plugin-content-docs/current.json index 34252e0b..f24c6a7f 100644 --- a/i18n/en/docusaurus-plugin-content-docs/current.json +++ b/i18n/en/docusaurus-plugin-content-docs/current.json @@ -30,9 +30,5 @@ "sidebar.myAutogeneratedSidebar.category.贡献指南": { "message": "Contribution Guide", "description": "The label for category 贡献指南 in sidebar myAutogeneratedSidebar" - }, - "sidebar.myAutogeneratedSidebar.category.数据集成": { - "message": "Data Integration", - "description": "The label for category 数据集成 in sidebar myAutogeneratedSidebar" } } \ No newline at end of file diff --git "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" "b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" deleted file mode 100644 index 03d4216d..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" +++ /dev/null @@ -1,42 +0,0 @@ -# RocketMQ Connect Overview - -RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, -它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。 - - - - -### Connector工作原理 - -RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。 - -### Connector的使用场景 - -#####构建流式数据管道 - - - -在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。 - -#####CDC - -CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。 - -### Connector 部署 - -在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。 - - - -一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。 - - - -RocketMQ Connect Worker支持两种运行模式,集群和单机 -集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。 - - - -单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。 - - diff --git "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" "b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" deleted file mode 100644 index 53c85258..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" +++ /dev/null @@ -1,28 +0,0 @@ -# Concept - -## Connector - -连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector -,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。 - -## Task - -是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。 - - - -通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。 - -通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。 - - - - -## Worker - -worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 -除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。 - - - -从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。 diff --git "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" "b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" deleted file mode 100644 index 1db8ccae..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" +++ /dev/null @@ -1,182 +0,0 @@ -# Quick Start - -[](https://www.apache.org/licenses/LICENSE-2.0.html) - -# 快速开始 - -单机模式下[rocketmq-connect-sample]作为 demo - -rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件 - -## 1.准备 - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. 创建测试Topic -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -## 2.构建Connect - -``` -git clone https://github.com/apache/rocketmq-connect.git - -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -## 3.运行Worker - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` -**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration - ->JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" - -runtime启动成功: - ->The standalone worker boot success. - -查看启动日志文件: - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -ctrl + c 退出日志 - -## 4.启动source connector - -当前目录创建测试文件 test-source-file.txt -``` -touch test-source-file.txt - -echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt - -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' -``` - -看到以下日志说明 file source connector 启动成功了 - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... - -#### source connector配置说明 - -| key | nullable | default | description | -|-------------------| -------- | ---------------------|--------------------------| -| connector.class | false | | 实现 Connector接口的类名称(包含包名) | -| filename | false | | 数据源文件名称 | -| connect.topicname | false | | 同步文件数据所需topic | - - -## 5.启动sink connector - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' - -cat test-sink-file.txt -``` - - -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -看到以下日志说明file sink connector 启动成功了 - -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... - -如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。 -文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。 - -#### sink connector配置说明 - -| key | nullable | default | description | -|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- | -| connector.class | false | | 实现Connector接口的类名称(包含包名) | -| filename | false | | sink拉去的数据保存到文件 | -| connect.topicnames | false | | sink需要处理数据消息topics | - -``` -注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准 -``` - -## 6.停止connector - -```shell -GET请求 -http://(your worker ip):(port)/connectors/(connector name)/stop - -停止demo中的两个connector -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - -``` -看到以下日志说明connector停止成功了 - ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7.停止Worker进程 - -``` -sh bin/connectshutdown.sh -``` - -## 8.日志目录 - ->${user.home}/logs/rocketmqconnect - -## 9.配置文件 - -持久化配置文件默认目录 /tmp/storeRoot - -| key | description | -|----------------------|---------------------------| -| connectorConfig.json | connector配置持久化文件 | -| position.json | source connect数据处理进度持久化文件 | -| taskConfig.json | task配置持久化文件 | -| offset.json | sink connect数据消费进度持久化文件 | -| connectorStatus.json | connector 状态持久化文件 | -| taskStatus.json | task 状态持久化文件 | - -## 10.配置说明 - -可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息 - -文件位置:work 启动目录下 conf/connect-standalone.conf - -```shell -#current cluster node uniquely identifies -workerId=DEFAULT_WORKER_1 - -# Http prot for user to access REST API -httpPort=8082 - -# Local file dir for config store -storePathRootDir=/home/connect/storeRoot - -#需要修改为自己的rocketmq nameserver 接入点 -# RocketMQ namesrvAddr -namesrvAddr=127.0.0.1:9876 - -#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件, -支持文件和目录 -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar - -# 补充:将 Connector 相关实现插件保存到指定文件夹 -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" "b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" deleted file mode 100644 index 43de0359..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" +++ /dev/null @@ -1,239 +0,0 @@ -# RocketMQ Connect In Action1 - -MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### MySQL镜像 -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - -slave:debezium/dbz - - -### 测试数据 - -通过root/debezium账号登录数据库 - -源数据库表:inventory.employee - -``` -CREATE database inventory; - -use inventory; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; - - - -INSERT INTO `employee` VALUES (1, 'name-01', 24, 6, 'company', 9987, '2021-12-22 08:00:00', '2022-06-14 18:20:11', 321.11); -INSERT INTO `employee` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00', '2022-06-14 18:18:47', 77.12); -INSERT INTO `employee` VALUES (8, 'name-03', 20, 1, NULL, 0, NULL, '2022-06-14 18:26:05', 11111.00); -INSERT INTO `employee` VALUES (9, 'name-04', 21, 1, 'company', 12345, '2021-12-24 20:44:10', '2022-06-14 18:20:02', 123.12); -INSERT INTO `employee` VALUES (11, 'name-05', 50, 2, 'company', 33333, '2021-12-24 22:14:52', '2022-06-14 18:19:58', 123.12); -INSERT INTO `employee` VALUES (12, 'name-06', 19, 3, NULL, 0, NULL, '2022-06-14 18:26:12', 111233.00); -INSERT INTO `employee` VALUES (13, 'name-07', 20, 4, 'company', 3237, '2021-12-29 01:31:03', '2022-06-14 18:19:27', 52.00); -INSERT INTO `employee` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39', '2022-06-14 18:18:32', 0.00); -INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14 20:13:29', NULL); - - - - -use -``` - -目标库:inventory_2.employee -``` -CREATE database inventory_2; -use inventory_2; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:inventory.employee -作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{ -"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector", -"max.task": "1", -"connect.topicname": "debezium-MySQL-source-topic", -"kafka.transforms": "Unwrap", -"kafka.transforms.Unwrap.delete.handling.mode": "none", -"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", -"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", -"database.history.skip.unparseable.ddl": true, -"database.history.name.srv.addr": "localhost:9876", -"database.history.rocketmq.topic": "db-history-debezium-topic", -"database.history.store.only.monitored.tables.ddl": true, -"include.schema.changes": false, -"database.server.name": "dbserver1", -"database.port": 3306, -"database.hostname": "数据库ip", -"database.connectionTimeZone": "UTC", -"database.user": "debezium", -"database.password": "dbz", -"table.include.list": "inventory.employee", -"max.batch.size": 50, -"database.include.list": "inventory", -"snapshot.mode": "when_needed", -"database.server.id": "184054", -"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", -"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-mysql-source", - "connection.url": "jdbc:mysql://数据库ip:3306/inventory_2", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - - -以上两个Connector任务创建成功以后 -通过root/debezium账号登录数据库 - -对源数据库表:inventory.employee增删改 -即可同步到目标办inventory_2.employee - - diff --git "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" "b/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" deleted file mode 100644 index 6404152c..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/current/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" +++ /dev/null @@ -1,233 +0,0 @@ -# RocketMQ Connect In Action2 - -PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime - -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### Postgres镜像 - -使用debezium的Postgres docker搭建环境MySQL数据库 -``` -# starting a pg instance -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14 - -# bash into postgres instance -docker exec -ti postgres /bin/bash -``` -Postgres信息 -端口:5432 -账号:start_data_engineer/password -同步的源数据库:bank.holding -目标库:bank1.holding1 - -### MySQL镜像 - -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - - -### 测试数据 - -通过start_data_engineer/password账号登录数据库 - -源数据库表:bank.holding - -``` -CREATE SCHEMA bank; -SET search_path TO bank,public; -CREATE TABLE bank.holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created timestamp, - datetime_updated timestamp, - primary key(holding_id) -); -ALTER TABLE bank.holding replica identity FULL; -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -\q -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -insert into bank.holding values (1001, 2, 'SP500', 1, now(), now()); -insert into bank.holding values (1003, 3, 'SP500', 1, now(), now()); -update bank.holding set holding_quantity = 300 where holding_id=1000; - -``` - -目标表:bank1.holding -``` -create database bank1; -CREATE TABLE holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created bigint, - datetime_updated bigint, - primary key(holding_id) -); - -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:bank.holding -作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{ - "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector", - "max.task": "1", - "connect.topicname": "debezium-postgres-source-01", - "kafka.transforms": "Unwrap", - "kafka.transforms.Unwrap.delete.handling.mode": "none", - "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", - "database.history.skip.unparseable.ddl": true, - "database.server.name": "bankserver1", - "database.port": 5432, - "database.hostname": "数据库ip", - "database.connectionTimeZone": "UTC", - "database.user": "start_data_engineer", - "database.dbname": "start_data_engineer", - "database.password": "password", - "table.whitelist": "bank.holding", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-postgres-source-01", - "connection.url": "jdbc:mysql://数据库ip:3306/bank1", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "holding_id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' - -``` - -以上两个Connector任务创建成功以后 -通过start_data_engineer/password账号登录数据库 -账号登录数据库 - -对源数据库表:bankholding增删改 -即可同步到目标表bank1.holding - - diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-Concept.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-Concept.png deleted file mode 100644 index b500ded2..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-Concept.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-process.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-process.png deleted file mode 100644 index 69c08cdf..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/Connector-Task-process.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy1.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy1.png deleted file mode 100644 index 3d7de5ba..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy1.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy2.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy2.png deleted file mode 100644 index 79039822..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy2.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy3.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy3.png deleted file mode 100644 index 185ef93b..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy3.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy4.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy4.png deleted file mode 100644 index 0fbe3254..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/deploy4.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/overview.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/overview.png deleted file mode 100644 index 22b6142d..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/overview.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/scene.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/scene.png deleted file mode 100644 index 90d05a6e..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/scene.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/worker.png b/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/worker.png deleted file mode 100644 index ad46d38a..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/current/picture/32rocketmq-connect/worker.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0.json b/i18n/en/docusaurus-plugin-content-docs/version-5.0.json index a55e0fdd..34c6fb91 100644 --- a/i18n/en/docusaurus-plugin-content-docs/version-5.0.json +++ b/i18n/en/docusaurus-plugin-content-docs/version-5.0.json @@ -50,9 +50,5 @@ "sidebar.myAutogeneratedSidebar.category.部署运维": { "message": "Deployment Operations", "description": "The label for category 部署运维 in sidebar myAutogeneratedSidebar" - }, - "sidebar.myAutogeneratedSidebar.category.数据集成": { - "message": "Data Integration", - "description": "The label for category 数据集成 in sidebar myAutogeneratedSidebar" } } \ No newline at end of file diff --git "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" "b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" deleted file mode 100644 index 03d4216d..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" +++ /dev/null @@ -1,42 +0,0 @@ -# RocketMQ Connect Overview - -RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, -它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。 - - - - -### Connector工作原理 - -RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。 - -### Connector的使用场景 - -#####构建流式数据管道 - - - -在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。 - -#####CDC - -CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。 - -### Connector 部署 - -在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。 - - - -一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。 - - - -RocketMQ Connect Worker支持两种运行模式,集群和单机 -集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。 - - - -单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。 - - diff --git "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" "b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" deleted file mode 100644 index 53c85258..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" +++ /dev/null @@ -1,28 +0,0 @@ -# Concept - -## Connector - -连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector -,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。 - -## Task - -是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。 - - - -通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。 - -通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。 - - - - -## Worker - -worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 -除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。 - - - -从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。 diff --git "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" "b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" deleted file mode 100644 index 1db8ccae..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" +++ /dev/null @@ -1,182 +0,0 @@ -# Quick Start - -[](https://www.apache.org/licenses/LICENSE-2.0.html) - -# 快速开始 - -单机模式下[rocketmq-connect-sample]作为 demo - -rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件 - -## 1.准备 - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. 创建测试Topic -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -## 2.构建Connect - -``` -git clone https://github.com/apache/rocketmq-connect.git - -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -## 3.运行Worker - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` -**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration - ->JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" - -runtime启动成功: - ->The standalone worker boot success. - -查看启动日志文件: - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -ctrl + c 退出日志 - -## 4.启动source connector - -当前目录创建测试文件 test-source-file.txt -``` -touch test-source-file.txt - -echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt - -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' -``` - -看到以下日志说明 file source connector 启动成功了 - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... - -#### source connector配置说明 - -| key | nullable | default | description | -|-------------------| -------- | ---------------------|--------------------------| -| connector.class | false | | 实现 Connector接口的类名称(包含包名) | -| filename | false | | 数据源文件名称 | -| connect.topicname | false | | 同步文件数据所需topic | - - -## 5.启动sink connector - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' - -cat test-sink-file.txt -``` - - -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -看到以下日志说明file sink connector 启动成功了 - -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... - -如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。 -文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。 - -#### sink connector配置说明 - -| key | nullable | default | description | -|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- | -| connector.class | false | | 实现Connector接口的类名称(包含包名) | -| filename | false | | sink拉去的数据保存到文件 | -| connect.topicnames | false | | sink需要处理数据消息topics | - -``` -注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准 -``` - -## 6.停止connector - -```shell -GET请求 -http://(your worker ip):(port)/connectors/(connector name)/stop - -停止demo中的两个connector -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - -``` -看到以下日志说明connector停止成功了 - ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7.停止Worker进程 - -``` -sh bin/connectshutdown.sh -``` - -## 8.日志目录 - ->${user.home}/logs/rocketmqconnect - -## 9.配置文件 - -持久化配置文件默认目录 /tmp/storeRoot - -| key | description | -|----------------------|---------------------------| -| connectorConfig.json | connector配置持久化文件 | -| position.json | source connect数据处理进度持久化文件 | -| taskConfig.json | task配置持久化文件 | -| offset.json | sink connect数据消费进度持久化文件 | -| connectorStatus.json | connector 状态持久化文件 | -| taskStatus.json | task 状态持久化文件 | - -## 10.配置说明 - -可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息 - -文件位置:work 启动目录下 conf/connect-standalone.conf - -```shell -#current cluster node uniquely identifies -workerId=DEFAULT_WORKER_1 - -# Http prot for user to access REST API -httpPort=8082 - -# Local file dir for config store -storePathRootDir=/home/connect/storeRoot - -#需要修改为自己的rocketmq nameserver 接入点 -# RocketMQ namesrvAddr -namesrvAddr=127.0.0.1:9876 - -#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件, -支持文件和目录 -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar - -# 补充:将 Connector 相关实现插件保存到指定文件夹 -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" "b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" deleted file mode 100644 index 43de0359..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" +++ /dev/null @@ -1,239 +0,0 @@ -# RocketMQ Connect In Action1 - -MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### MySQL镜像 -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - -slave:debezium/dbz - - -### 测试数据 - -通过root/debezium账号登录数据库 - -源数据库表:inventory.employee - -``` -CREATE database inventory; - -use inventory; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; - - - -INSERT INTO `employee` VALUES (1, 'name-01', 24, 6, 'company', 9987, '2021-12-22 08:00:00', '2022-06-14 18:20:11', 321.11); -INSERT INTO `employee` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00', '2022-06-14 18:18:47', 77.12); -INSERT INTO `employee` VALUES (8, 'name-03', 20, 1, NULL, 0, NULL, '2022-06-14 18:26:05', 11111.00); -INSERT INTO `employee` VALUES (9, 'name-04', 21, 1, 'company', 12345, '2021-12-24 20:44:10', '2022-06-14 18:20:02', 123.12); -INSERT INTO `employee` VALUES (11, 'name-05', 50, 2, 'company', 33333, '2021-12-24 22:14:52', '2022-06-14 18:19:58', 123.12); -INSERT INTO `employee` VALUES (12, 'name-06', 19, 3, NULL, 0, NULL, '2022-06-14 18:26:12', 111233.00); -INSERT INTO `employee` VALUES (13, 'name-07', 20, 4, 'company', 3237, '2021-12-29 01:31:03', '2022-06-14 18:19:27', 52.00); -INSERT INTO `employee` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39', '2022-06-14 18:18:32', 0.00); -INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14 20:13:29', NULL); - - - - -use -``` - -目标库:inventory_2.employee -``` -CREATE database inventory_2; -use inventory_2; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:inventory.employee -作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{ -"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector", -"max.task": "1", -"connect.topicname": "debezium-MySQL-source-topic", -"kafka.transforms": "Unwrap", -"kafka.transforms.Unwrap.delete.handling.mode": "none", -"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", -"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", -"database.history.skip.unparseable.ddl": true, -"database.history.name.srv.addr": "localhost:9876", -"database.history.rocketmq.topic": "db-history-debezium-topic", -"database.history.store.only.monitored.tables.ddl": true, -"include.schema.changes": false, -"database.server.name": "dbserver1", -"database.port": 3306, -"database.hostname": "数据库ip", -"database.connectionTimeZone": "UTC", -"database.user": "debezium", -"database.password": "dbz", -"table.include.list": "inventory.employee", -"max.batch.size": 50, -"database.include.list": "inventory", -"snapshot.mode": "when_needed", -"database.server.id": "184054", -"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", -"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-mysql-source", - "connection.url": "jdbc:mysql://数据库ip:3306/inventory_2", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - - -以上两个Connector任务创建成功以后 -通过root/debezium账号登录数据库 - -对源数据库表:inventory.employee增删改 -即可同步到目标办inventory_2.employee - - diff --git "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" "b/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" deleted file mode 100644 index 6404152c..00000000 --- "a/i18n/en/docusaurus-plugin-content-docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" +++ /dev/null @@ -1,233 +0,0 @@ -# RocketMQ Connect In Action2 - -PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime - -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### Postgres镜像 - -使用debezium的Postgres docker搭建环境MySQL数据库 -``` -# starting a pg instance -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14 - -# bash into postgres instance -docker exec -ti postgres /bin/bash -``` -Postgres信息 -端口:5432 -账号:start_data_engineer/password -同步的源数据库:bank.holding -目标库:bank1.holding1 - -### MySQL镜像 - -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - - -### 测试数据 - -通过start_data_engineer/password账号登录数据库 - -源数据库表:bank.holding - -``` -CREATE SCHEMA bank; -SET search_path TO bank,public; -CREATE TABLE bank.holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created timestamp, - datetime_updated timestamp, - primary key(holding_id) -); -ALTER TABLE bank.holding replica identity FULL; -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -\q -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -insert into bank.holding values (1001, 2, 'SP500', 1, now(), now()); -insert into bank.holding values (1003, 3, 'SP500', 1, now(), now()); -update bank.holding set holding_quantity = 300 where holding_id=1000; - -``` - -目标表:bank1.holding -``` -create database bank1; -CREATE TABLE holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created bigint, - datetime_updated bigint, - primary key(holding_id) -); - -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:bank.holding -作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{ - "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector", - "max.task": "1", - "connect.topicname": "debezium-postgres-source-01", - "kafka.transforms": "Unwrap", - "kafka.transforms.Unwrap.delete.handling.mode": "none", - "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", - "database.history.skip.unparseable.ddl": true, - "database.server.name": "bankserver1", - "database.port": 5432, - "database.hostname": "数据库ip", - "database.connectionTimeZone": "UTC", - "database.user": "start_data_engineer", - "database.dbname": "start_data_engineer", - "database.password": "password", - "table.whitelist": "bank.holding", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-postgres-source-01", - "connection.url": "jdbc:mysql://数据库ip:3306/bank1", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "holding_id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' - -``` - -以上两个Connector任务创建成功以后 -通过start_data_engineer/password账号登录数据库 -账号登录数据库 - -对源数据库表:bankholding增删改 -即可同步到目标表bank1.holding - - diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png deleted file mode 100644 index b500ded2..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png deleted file mode 100644 index 69c08cdf..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy1.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy1.png deleted file mode 100644 index 3d7de5ba..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy1.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy2.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy2.png deleted file mode 100644 index 79039822..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy2.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy3.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy3.png deleted file mode 100644 index 185ef93b..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy3.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy4.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy4.png deleted file mode 100644 index 0fbe3254..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/deploy4.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/overview.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/overview.png deleted file mode 100644 index 22b6142d..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/overview.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/scene.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/scene.png deleted file mode 100644 index 90d05a6e..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/scene.png and /dev/null differ diff --git a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/worker.png b/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/worker.png deleted file mode 100644 index ad46d38a..00000000 Binary files a/i18n/en/docusaurus-plugin-content-docs/version-5.0/picture/32rocketmq-connect/worker.png and /dev/null differ diff --git "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" "b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" deleted file mode 100644 index 4324b210..00000000 --- "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/25RocketMQ Connect Overview.md" +++ /dev/null @@ -1,42 +0,0 @@ -# RocketMQ Connect 概览 - -RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, -它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。 - - - - -### Connector工作原理 - -RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。 - -### Connector的使用场景 - -#####构建流式数据管道 - - - -在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。 - -#####CDC - -CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。 - -### Connector 部署 - -在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。 - - - -一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。 - - - -RocketMQ Connect Worker支持两种运行模式,集群和单机 -集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。 - - - -单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。 - - diff --git "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" "b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" deleted file mode 100644 index f0981b4c..00000000 --- "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/26RocketMQ Connect Concept.md" +++ /dev/null @@ -1,28 +0,0 @@ -# 概念 - -## Connector - -连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector -,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。 - -## Task - -是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。 - - - -通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。 - -通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。 - - - - -## Worker - -worker 进程是Connector和Task运行环境,它提供RESTFull能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 -除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。 - - - -从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。 diff --git "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" "b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" deleted file mode 100644 index 31c95895..00000000 --- "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/27RocketMQ Connect Quick Start.md" +++ /dev/null @@ -1,182 +0,0 @@ -# 快速开始 - -[](https://www.apache.org/licenses/LICENSE-2.0.html) - -# 快速开始 - -单机模式下[rocketmq-connect-sample]作为 demo - -rocketmq-connect-sample的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件 - -## 1.准备 - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); -5. 创建测试Topic -> sh ${ROCKETMQ_HOME}/bin/mqadmin updateTopic -t fileTopic -n localhost:9876 -c DefaultCluster -r 8 -w 8 - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -## 2.构建Connect - -``` -git clone https://github.com/apache/rocketmq-connect.git - -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -## 3.运行Worker - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` -**tips**: 可修改 /bin/runconnect.sh 适当调整 JVM Parameters Configuration - ->JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m" - -runtime启动成功: - ->The standalone worker boot success. - -查看启动日志文件: - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -ctrl + c 退出日志 - -## 4.启动source connector - -当前目录创建测试文件 test-source-file.txt -``` -touch test-source-file.txt - -echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt - -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}' -``` - -看到以下日志说明 file source connector 启动成功了 - ->tail -100f ~/logs/rocketmqconnect/connect_runtime.log -> ->2019-07-16 11:18:39 INFO pool-7-thread-1 - **Source task start**, config:{"properties":{"source-record-... - -#### source connector配置说明 - -| key | nullable | default | description | -|-------------------| -------- | ---------------------|--------------------------| -| connector.class | false | | 实现 Connector接口的类名称(包含包名) | -| filename | false | | 数据源文件名称 | -| connect.topicname | false | | 同步文件数据所需topic | - - -## 5.启动sink connector - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}' - -cat test-sink-file.txt -``` - - -> tail -100f ~/logs/rocketmqconnect/connect_runtime.log - -看到以下日志说明file sink connector 启动成功了 - -> 2019-07-16 11:24:58 INFO pool-7-thread-2 - **Sink task start**, config:{"properties":{"source-record-... - -如果 test-sink-file.txt 生成并且与 source-file.txt 内容一样,说明整个流程正常运行。 -文件内容可能顺序不一样,这主要是因为RocketMQ发到不同queue时,接收不同queue消息顺序可能也不一致导致的,是正常的。 - -#### sink connector配置说明 - -| key | nullable | default | description | -|--------------------| -------- | ------- | -------------------------------------------------------------------------------------- | -| connector.class | false | | 实现Connector接口的类名称(包含包名) | -| filename | false | | sink拉去的数据保存到文件 | -| connect.topicnames | false | | sink需要处理数据消息topics | - -``` -注:source/sink配置文件说明是以rocketmq-connect-sample为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector 为准 -``` - -## 6.停止connector - -```shell -GET请求 -http://(your worker ip):(port)/connectors/(connector name)/stop - -停止demo中的两个connector -curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop -curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop - -``` -看到以下日志说明connector停止成功了 - ->**Source task stop**, config:{"properties":{"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter","filename":"/home/zhoubo/IdeaProjects/my-new3-rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/source-file.txt","task-class":"org.apache.rocketmq.connect.file.FileSourceTask","topic":"fileTopic","connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector","update-timestamp":"1564765189322"}} - -## 7.停止Worker进程 - -``` -sh bin/connectshutdown.sh -``` - -## 8.日志目录 - ->${user.home}/logs/rocketmqconnect - -## 9.配置文件 - -持久化配置文件默认目录 /tmp/storeRoot - -| key | description | -|----------------------|---------------------------| -| connectorConfig.json | connector配置持久化文件 | -| position.json | source connect数据处理进度持久化文件 | -| taskConfig.json | task配置持久化文件 | -| offset.json | sink connect数据消费进度持久化文件 | -| connectorStatus.json | connector 状态持久化文件 | -| taskStatus.json | task 状态持久化文件 | - -## 10.配置说明 - -可根据使用情况修改 [RESTful](https://restfulapi.cn/) 端口,storeRoot 路径,Nameserver 地址等信息 - -文件位置:work 启动目录下 conf/connect-standalone.conf - -```shell -#current cluster node uniquely identifies -workerId=DEFAULT_WORKER_1 - -# Http prot for user to access REST API -httpPort=8082 - -# Local file dir for config store -storePathRootDir=/home/connect/storeRoot - -#需要修改为自己的rocketmq nameserver 接入点 -# RocketMQ namesrvAddr -namesrvAddr=127.0.0.1:9876 - -#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件, -支持文件和目录 -# Source or sink connector jar file dir -pluginPaths=rocketmq-connect-sample/target/rocketmq-connect-sample-0.0.1-SNAPSHOT.jar - -# 补充:将 Connector 相关实现插件保存到指定文件夹 -# pluginPaths=/usr/local/connector-plugins/* -``` \ No newline at end of file diff --git "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" "b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" deleted file mode 100644 index fbd1ba41..00000000 --- "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/28RocketMQ Connect In Action1.md" +++ /dev/null @@ -1,239 +0,0 @@ -# RocketMQ Connect实战1 - -MySQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### MySQL镜像 -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - -slave:debezium/dbz - - -### 测试数据 - -通过root/debezium账号登录数据库 - -源数据库表:inventory.employee - -``` -CREATE database inventory; - -use inventory; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; - - - -INSERT INTO `employee` VALUES (1, 'name-01', 24, 6, 'company', 9987, '2021-12-22 08:00:00', '2022-06-14 18:20:11', 321.11); -INSERT INTO `employee` VALUES (2, 'name-02', 19, 7, 'company', 32232, '2021-12-29 08:00:00', '2022-06-14 18:18:47', 77.12); -INSERT INTO `employee` VALUES (8, 'name-03', 20, 1, NULL, 0, NULL, '2022-06-14 18:26:05', 11111.00); -INSERT INTO `employee` VALUES (9, 'name-04', 21, 1, 'company', 12345, '2021-12-24 20:44:10', '2022-06-14 18:20:02', 123.12); -INSERT INTO `employee` VALUES (11, 'name-05', 50, 2, 'company', 33333, '2021-12-24 22:14:52', '2022-06-14 18:19:58', 123.12); -INSERT INTO `employee` VALUES (12, 'name-06', 19, 3, NULL, 0, NULL, '2022-06-14 18:26:12', 111233.00); -INSERT INTO `employee` VALUES (13, 'name-07', 20, 4, 'company', 3237, '2021-12-29 01:31:03', '2022-06-14 18:19:27', 52.00); -INSERT INTO `employee` VALUES (14, 'name-08', 25, 15, 'company', 32255, '2022-02-08 19:06:39', '2022-06-14 18:18:32', 0.00); -INSERT INTO `employee` VALUES (15, NULL, 0, 0, NULL, 0, NULL, '2022-06-14 20:13:29', NULL); - - - - -use -``` - -目标库:inventory_2.employee -``` -CREATE database inventory_2; -use inventory_2; -CREATE TABLE `employee` ( -`id` bigint NOT NULL AUTO_INCREMENT, -`name` varchar(128) DEFAULT NULL, -`howold` int DEFAULT NULL, -`male` int DEFAULT NULL, -`company` varchar(128) DEFAULT NULL, -`money` double DEFAULT NULL, -`begin_time` datetime DEFAULT NULL, -`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', -`decimal_test` decimal(11,2) DEFAULT NULL COMMENT 'test decimal type', -PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8; -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:inventory.employee -作用:通过解析MySQL binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl-X POST-H"Content-Type: application/json"http: //127.0.0.1:8082/connectors/MySQLCDCSource'{ -"connector.class": "org.apache.rocketmq.connect.debezium.MySQL.DebeziumMySQLConnector", -"max.task": "1", -"connect.topicname": "debezium-MySQL-source-topic", -"kafka.transforms": "Unwrap", -"kafka.transforms.Unwrap.delete.handling.mode": "none", -"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", -"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", -"database.history.skip.unparseable.ddl": true, -"database.history.name.srv.addr": "localhost:9876", -"database.history.rocketmq.topic": "db-history-debezium-topic", -"database.history.store.only.monitored.tables.ddl": true, -"include.schema.changes": false, -"database.server.name": "dbserver1", -"database.port": 3306, -"database.hostname": "数据库ip", -"database.connectionTimeZone": "UTC", -"database.user": "debezium", -"database.password": "dbz", -"table.include.list": "inventory.employee", -"max.batch.size": 50, -"database.include.list": "inventory", -"snapshot.mode": "when_needed", -"database.server.id": "184054", -"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", -"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-mysql-source", - "connection.url": "jdbc:mysql://数据库ip:3306/inventory_2", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - - -以上两个Connector任务创建成功以后 -通过root/debezium账号登录数据库 - -对源数据库表:inventory.employee增删改 -即可同步到目标办inventory_2.employee - - diff --git "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" "b/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" deleted file mode 100644 index 5abc1c2b..00000000 --- "a/versioned_docs/version-5.0/07-\346\225\260\346\215\256\351\233\206\346\210\220/29RocketMQ Connect In Action2.md" +++ /dev/null @@ -1,233 +0,0 @@ -# RocketMQ Connect实战2 - -PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC) - -## 准备 - -### 启动RocketMQ - -1. Linux/Unix/Mac -2. 64bit JDK 1.8+; -3. Maven 3.2.x或以上版本; -4. 启动 [RocketMQ](https://rocketmq.apache.org/docs/quick-start/); - - - -**tips** : ${ROCKETMQ_HOME} 位置说明 - ->bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release -> ->source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution - - -### 启动Connect - - -#### Connector插件编译 - -Debezium RocketMQ Connector -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-debezium-postgresql/ -$ mvn clean package -Dmaven.test.skip=true -``` - -将 Debezium MySQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下: -``` -mkdir -p /usr/local/connector-plugins -cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins -``` - -JDBC Connector - -将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下: -``` -$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/ -$ mvn clean package -Dmaven.test.skip=true -cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins - -``` - -#### 启动Connect Runtime - -``` -cd rocketmq-connect - -mvn -Prelease-connect -DskipTests clean install -U - -``` - -修改配置`connect-standalone.conf` ,重点配置如下 -``` -$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT -$ vim conf/connect-standalone.conf -``` - -``` -workerId=standalone-worker -storePathRootDir=/tmp/storeRoot - -## Http port for user to access REST API -httpPort=8082 - -# Rocketmq namesrvAddr -namesrvAddr=localhost:9876 - -# RocketMQ acl -aclEnable=false -accessKey=rocketmq -secretKey=12345678 - -autoCreateGroupEnable=false -clusterName="DefaultCluster" - -# 核心配置,将之前编译好debezium包的插件目录配置在此; -# Source or sink connector jar file dir,The default value is rocketmq-connect-sample -pluginPaths=/usr/local/connector-plugins -``` - - -``` -cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT - -sh bin/connect-standalone.sh -c conf/connect-standalone.conf & - -``` - -### Postgres镜像 - -使用debezium的Postgres docker搭建环境MySQL数据库 -``` -# starting a pg instance -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14 - -# bash into postgres instance -docker exec -ti postgres /bin/bash -``` -Postgres信息 -端口:5432 -账号:start_data_engineer/password -同步的源数据库:bank.holding -目标库:bank1.holding1 - -### MySQL镜像 - -使用debezium的MySQL docker搭建环境MySQL数据库 -``` -docker run -it --rm --name MySQL -p 3306:3306 -e MySQL_ROOT_PASSWORD=debezium -e MySQL_USER=MySQLuser -e MySQL_PASSWORD=MySQLpw quay.io/debezium/example-MySQL:1.9 -``` -MySQL信息 - -端口:3306 - -账号:root/debezium - - -### 测试数据 - -通过start_data_engineer/password账号登录数据库 - -源数据库表:bank.holding - -``` -CREATE SCHEMA bank; -SET search_path TO bank,public; -CREATE TABLE bank.holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created timestamp, - datetime_updated timestamp, - primary key(holding_id) -); -ALTER TABLE bank.holding replica identity FULL; -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -\q -insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now()); -insert into bank.holding values (1001, 2, 'SP500', 1, now(), now()); -insert into bank.holding values (1003, 3, 'SP500', 1, now(), now()); -update bank.holding set holding_quantity = 300 where holding_id=1000; - -``` - -目标表:bank1.holding -``` -create database bank1; -CREATE TABLE holding ( - holding_id int, - user_id int, - holding_stock varchar(8), - holding_quantity int, - datetime_created bigint, - datetime_updated bigint, - primary key(holding_id) -); - -``` - -## 启动Connector - -### 启动Debezium source connector - -同步原表数据:bank.holding -作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{ - "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector", - "max.task": "1", - "connect.topicname": "debezium-postgres-source-01", - "kafka.transforms": "Unwrap", - "kafka.transforms.Unwrap.delete.handling.mode": "none", - "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table", - "database.history.skip.unparseable.ddl": true, - "database.server.name": "bankserver1", - "database.port": 5432, - "database.hostname": "数据库ip", - "database.connectionTimeZone": "UTC", - "database.user": "start_data_engineer", - "database.dbname": "start_data_engineer", - "database.password": "password", - "table.whitelist": "bank.holding", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' -``` - -### 启动 jdbc sink connector - -作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中 - -``` -curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{ - "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", - "max.task": "2", - "connect.topicnames": "debezium-postgres-source-01", - "connection.url": "jdbc:mysql://数据库ip:3306/bank1", - "connection.user": "root", - "connection.password": "debezium", - "pk.fields": "holding_id", - "table.name.from.header": "true", - "pk.mode": "record_key", - "insert.mode": "UPSERT", - "db.timezone": "UTC", - "table.types": "TABLE", - "errors.deadletterqueue.topic.name": "dlq-topic", - "errors.log.enable": "true", - "errors.tolerance": "ALL", - "delete.enabled": "true", - "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", - "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" -}' - -``` - -以上两个Connector任务创建成功以后 -通过start_data_engineer/password账号登录数据库 -账号登录数据库 - -对源数据库表:bankholding增删改 -即可同步到目标表bank1.holding - - diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png deleted file mode 100644 index b500ded2..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-Concept.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png deleted file mode 100644 index 69c08cdf..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/Connector-Task-process.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy1.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy1.png deleted file mode 100644 index 3d7de5ba..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy1.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy2.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy2.png deleted file mode 100644 index 79039822..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy2.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy3.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy3.png deleted file mode 100644 index 185ef93b..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy3.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy4.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy4.png deleted file mode 100644 index 0fbe3254..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/deploy4.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/overview.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/overview.png deleted file mode 100644 index 22b6142d..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/overview.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/scene.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/scene.png deleted file mode 100644 index 90d05a6e..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/scene.png and /dev/null differ diff --git a/versioned_docs/version-5.0/picture/32rocketmq-connect/worker.png b/versioned_docs/version-5.0/picture/32rocketmq-connect/worker.png deleted file mode 100644 index ad46d38a..00000000 Binary files a/versioned_docs/version-5.0/picture/32rocketmq-connect/worker.png and /dev/null differ
