This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 8ecde453ec4680fd8b37538bd7bd6312cce7fc00 Author: yuchenlichuck <[email protected]> AuthorDate: Fri Aug 2 00:20:44 2019 +0800 Add JdbcSourceTask and Schema --- README.md | 113 ++++++++++++++++++++- lib/mysql-connector-java-8.0.11.jar | Bin 0 -> 2036609 bytes pom.xml | 39 ++++++- .../org/apache/rocketmq/connect/jdbc/Config.java | 13 +-- .../jdbc/connector/JdbcSourceConnector.java | 9 +- .../connect/jdbc/connector/JdbcSourceTask.java | 16 +-- .../rocketmq/connect/jdbc/schema/Database.java | 2 - .../rocketmq/connect/jdbc/source/Querier.java | 8 +- .../rocketmq/connect/jdbc/ReplicatorTest.java | 74 ++++++++++++++ .../jdbc/connector/JdbcSourceConnectorTest.java | 6 +- 10 files changed, 254 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index f02d838..96da884 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,9 @@ - For example ```javascript -SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null', -schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING}, -Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]} + SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null', + schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING}, + Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]} ``` #### Mentioned DataBase Information and all SourceDataEntry @@ -73,3 +73,110 @@ Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}  +**启动Connector** + +[http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms](http://127.0.0.1:8081/connectors/connector-name?config=%7B%22connector-class%22:%22org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector%22,%22oms-driver-url%22:%22oms): rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0. [...] + +**查看Connector运行状态** + +<http://127.0.0.1:8081/connectors/connector-name/status> + +**查看Connector配置** + +<http://127.0.0.1:8081/connectors/connector-name/config> + +**关闭Connector** + +<http://127.0.0.1:8081/connectors/connector-name/stop> + + + + + + + +# JDBC Connector 构建 + + + +#### 一、下载rocketmq-connect-runtime + +``` +1、git clone https://github.com/apache/rocketmq-externals.git + +2、cd rocketmq-externals/rocketmq-connect-runtime + +3、mvn -Dmaven.test.skip=true package + +4、cd target/distribution/conf +``` + +- a、修改connect.conf配置文件 + +``` +#1、rocketmq 配置 +namesrvAddr=127.0.0.1:9876 + +#2、file-connect jar包路径 +pluginPaths=/home/connect/file-connect/target + +#3、runtime持久化文件目录 +storePathRootDir=/home/connect/storeRoot + +#4、http服务端口 +httpPort=8081 +``` + + + + + +- b、日志相关配置在logback.xml中修改 + +``` +注:rocketmq需要先创建cluster-topic,config-topic,offset-topic,position-topic +4个topic,并且为了保证消息有序,每个topic可以只一个queue +``` + +### 二、启动Connector + +1、启动runtime +回到rocketmq-externals/rocketmq-connect-runtime目录 + +``` +./run_worker.sh +``` + +看到日志目录查看connect_runtime.log + +如果看到以下日志说明runttiime启动成功了 + +2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started +2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success. + +2、启动sourceConnector + + 正在做测试(To be continued)已实现Bulk Mode + +cd target/distribution/ + +java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf + + + +在http中输入Get 请求 + + + +示例 + +[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...] + + + + + + + + + diff --git a/lib/mysql-connector-java-8.0.11.jar b/lib/mysql-connector-java-8.0.11.jar new file mode 100644 index 0000000..c8b8b3f Binary files /dev/null and b/lib/mysql-connector-java-8.0.11.jar differ diff --git a/pom.xml b/pom.xml index dee7710..830f9ed 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,29 @@ <artifactId>findbugs-maven-plugin</artifactId> <version>3.0.4</version> </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.rocketmq.connect.jdbc.jdbcSourceConnector</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> @@ -159,6 +182,11 @@ <artifactId>openmessaging-connector</artifactId> <version>0.1.0-beta</version> </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>0.3.1-alpha</version> + </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> @@ -194,7 +222,16 @@ <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> - + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>8.0.11</version> + </dependency> + <dependency> + <groupId>io.javalin</groupId> + <artifactId>javalin</artifactId> + <version>1.3.0</version> + </dependency> </dependencies> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java index 69ff9b0..4f7456b 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java @@ -61,19 +61,20 @@ public class Config { public String dbTimezone="UTC"; public String queueName; + private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { - add("jdbcUrl"); - add("jdbcUsername"); - add("jdbcPassword"); - add("mode"); - add("rocketmqTopic"); + // add("jdbcUrl"); + // add("jdbcUsername"); + // add("jdbcPassword"); + // add("mode"); + // add("rocketmqTopic"); } }; public void load(KeyValue props) { - + log.info("Config.load.start"); properties2Object(props, this); } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index 8a6047c..8c30a62 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -26,7 +26,7 @@ import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; import org.apache.rocketmq.connect.jdbc.Config; -//import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; +import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +36,16 @@ public class JdbcSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue config) { - log.info("JdbcSourceConnector verifyAndSetConfig enter"); + + log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter"); for (String requestKey : Config.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { return "Request config key: " + requestKey; } } this.config = config; + return ""; } @@ -71,9 +74,9 @@ public class JdbcSourceConnector extends SourceConnector { @Override public List<KeyValue> taskConfigs() { + log.info("List.start"); List<KeyValue> config = new ArrayList<>(); config.add(this.config); return config; } - } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index 32ea763..78f1809 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import org.apache.rocketmq.connect.jdbc.Config; -import org.apache.rocketmq.connect.jdbc.Replicator; import org.apache.rocketmq.connect.jdbc.schema.Table; import org.apache.rocketmq.connect.jdbc.source.Querier; import org.apache.rocketmq.connect.jdbc.schema.column.*; @@ -47,8 +46,6 @@ public class JdbcSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class); - private Replicator replicator; - private Config config; private List<Table> list=new LinkedList<>(); @@ -58,15 +55,20 @@ public class JdbcSourceTask extends SourceTask { public Collection<SourceDataEntry> poll() { List<SourceDataEntry> res = new ArrayList<>(); try { + JSONObject jsonObject = new JSONObject(); jsonObject.put("nextQuery", "database"); jsonObject.put("nextPosition", "10"); //To be Continued + log.info("querier.poll"); querier.poll(); - System.out.println(querier.getList().size()); + log.info("1216connector.start"); + int mm=0; for(Table dataRow : querier.getList()){ System.out.println(dataRow.getColList().get(0)); - Schema schema = new Schema(); + log.info("xunhuankaishi"); + log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++); + Schema schema = new Schema(); schema.setDataSource(dataRow.getDatabase()); schema.setName(dataRow.getName()); schema.setFields(new ArrayList<>()); @@ -102,7 +104,9 @@ public class JdbcSourceTask extends SourceTask { try { this.config = new Config(); this.config.load(props); + log.info("querier.start"); querier.start(); + } catch (Exception e) { log.error("JDBC task start failed.", e); } @@ -110,7 +114,7 @@ public class JdbcSourceTask extends SourceTask { @Override public void stop() { - replicator.stop(); + querier.stop(); } @Override public void pause() { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java index 15fb77b..b88661d 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java @@ -87,8 +87,6 @@ public class Database { private void addTable(String tableName) { - // LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName); - Table table = new Table(name, tableName); tableMap.put(tableName, table); } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index e99da74..1f630ea 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -65,7 +65,7 @@ public class Querier { return connection; } - public void close() { + public void stop() { Connection conn; while ((conn = connections.poll()) != null) { try { @@ -114,7 +114,6 @@ public class Querier { private Schema schema; - private Map<Long, Table> tableMap = new HashMap<>(); public void poll() { try { @@ -125,6 +124,8 @@ public class Querier { for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { String db = entry.getKey(); + if(!db.contains("jdbc_db")) + continue; Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); @@ -163,6 +164,7 @@ public class Querier { initDataSource(); schema = new Schema(dataSource); schema.load(); + log.info("schema load successful"); } private void initDataSource() throws Exception { @@ -179,7 +181,9 @@ public class Querier { map.put("minEvictableIdleTimeMillis", "300000"); map.put("validationQuery", "SELECT 1 FROM DUAL"); map.put("testWhileIdle", "true"); + log.info("{},config read successful",map); dataSource = DruidDataSourceFactory.createDataSource(map); + } } diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java new file mode 100644 index 0000000..88d5586 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java @@ -0,0 +1,74 @@ +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.rocketmq.connect.jms; +// +//import java.lang.reflect.Field; +// +//import javax.jms.Message; +// +//import org.apache.activemq.command.ActiveMQTextMessage; +//import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; +//import org.junit.Before; +//import org.junit.Test; +//import org.mockito.Mockito; +// +//import org.junit.Assert; +// +//public class ReplicatorTest { +// +// Replicator replicator; +// +// PatternProcessor patternProcessor; +// +// Config config; +// +// @Before +// public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { +// config = new Config(); +// replicator = new Replicator(config,null); +// +// patternProcessor = Mockito.mock(PatternProcessor.class); +// +// Field processor = Replicator.class.getDeclaredField("processor"); +// processor.setAccessible(true); +// processor.set(replicator, patternProcessor); +// } +// +// @Test(expected = RuntimeException.class) +// public void startTest() throws Exception { +// replicator.start(); +// } +// +// @Test +// public void stop() throws Exception { +// replicator.stop(); +// Mockito.verify(patternProcessor, Mockito.times(1)).stop(); +// } +// +// @Test +// public void commitAddGetQueueTest() { +// Message message = new ActiveMQTextMessage(); +// replicator.commit(message, false); +// Assert.assertEquals(replicator.getQueue().poll(), message); +// } +// +// @Test +// public void getConfigTest() { +// Assert.assertEquals(replicator.getConfig(), config); +// } +//} diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java index 97d87ee..297d517 100644 --- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java @@ -38,8 +38,6 @@ public class JdbcSourceConnectorTest { add("jdbcUrl"); add("jdbcUsername"); add("jdbcPassword"); - add("mode"); - add("rocketmqTopic"); } }; @@ -52,7 +50,9 @@ public class JdbcSourceConnectorTest { } - +// Set<String> getRequiredConfig() { +// return REQUEST_CONFIG; +// } }; @Test
