This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit e915f678c601912fec4b1f7438972f740ca5288b Author: yuchenlichuck <[email protected]> AuthorDate: Mon Jul 29 21:49:58 2019 +0800 Add JdbcSourceTask and Schema --- pom.xml | 12 +++ .../apache/rocketmq/connect/jdbc/Replicator.java | 118 --------------------- .../rocketmq/connect/jdbc/source/Querier.java | 2 - .../jdbc/connector/JdbcSourceConnectorTest.java | 9 +- .../connect/jdbc/connector/JdbcSourceTaskTest.java | 44 ++++++++ 5 files changed, 60 insertions(+), 125 deletions(-) diff --git a/pom.xml b/pom.xml index a53b8b0..dee7710 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.12</version> + </dependency> + <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-connector</artifactId> <version>0.1.0-beta</version> @@ -179,11 +184,18 @@ <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.2</version> </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>druid</artifactId> + <version>1.0.18</version> + </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> + + </dependencies> </project> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java deleted file mode 100644 index b24b7e5..0000000 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.connect.jdbc; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Replicator { - - private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); - - private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); - - private Config config; - - private EventProcessor eventProcessor; - - private Object lock = new Object(); - private BinlogPosition nextBinlogPosition; - private long nextQueueOffset; - private long xid; - private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>(); - - public Replicator(Config config){ - this.config = config; - } - - public void start() { - - try { - - eventProcessor = new EventProcessor(this); - eventProcessor.start(); - - } catch (Exception e) { - LOGGER.error("Start error.", e); - } - } - - public void stop(){ - eventProcessor.stop(); - } - - public void commit(Transaction transaction, boolean isComplete) { - - queue.add(transaction); - for (int i = 0; i < 3; i++) { - try { - if (isComplete) { - long offset = 1; - synchronized (lock) { - xid = transaction.getXid(); - nextBinlogPosition = transaction.getNextBinlogPosition(); - nextQueueOffset = offset; - } - - } else { - } - break; - - } catch (Exception e) { - LOGGER.error("Push error,retry:" + (i + 1) + ",", e); - } - } - } - - public void logPosition() { - - String binlogFilename = null; - long xid = 0L; - long nextPosition = 0L; - long nextOffset = 0L; - - synchronized (lock) { - if (nextBinlogPosition != null) { - xid = this.xid; - binlogFilename = nextBinlogPosition.getBinlogFilename(); - nextPosition = nextBinlogPosition.getPosition(); - nextOffset = nextQueueOffset; - } - } - - if (binlogFilename != null) { - POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}", - xid, binlogFilename, nextPosition, nextOffset); - } - - } - - public Config getConfig() { - return config; - } - -// public BinlogPosition getNextBinlogPosition() { -// return nextBinlogPosition; -// } - - public BlockingQueue<Transaction> getQueue() { - return queue; - } -} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index 61323d4..e99da74 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -125,8 +125,6 @@ public class Querier { for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) { String db = entry.getKey(); - if(!db.contains("jdbc_db")) - continue; Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Table> tableEntry = iterator.next(); diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java index 79e4e59..97d87ee 100644 --- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java @@ -35,10 +35,11 @@ public class JdbcSourceConnectorTest { public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { - add("jdbcAddr"); - add("jdbcPort"); + add("jdbcUrl"); add("jdbcUsername"); add("jdbcPassword"); + add("mode"); + add("rocketmqTopic"); } }; @@ -51,9 +52,7 @@ public class JdbcSourceConnectorTest { } -// Set<String> getRequiredConfig() { -// return REQUEST_CONFIG; -// } + }; @Test diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java new file mode 100644 index 0000000..f9c8c6f --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.connector; +import java.util.Collection; +import org.junit.Test; + + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.internal.DefaultKeyValue; + +public class JdbcSourceTaskTest { + + + @Test + public void test() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("jdbcUrl","localhost:3306"); + kv.put("jdbcUsername","root"); + kv.put("jdbcPassword","199812160"); + kv.put("mode","bulk"); + kv.put("rocketmqTopic","JdbcTopic"); + JdbcSourceTask task = new JdbcSourceTask(); + task.start(kv); + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + System.out.println(sourceDataEntry); + + } +}
