This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 13b6ae3ba93a56b156f872dbac4914faf34d9050 Author: yuchenlichuck <[email protected]> AuthorDate: Thu Jul 18 22:52:55 2019 +0800 Add Jdbc Source Connector and Do Unit Test --- pom.xml | 195 +++++++++++++++++++++ .../org/apache/rocketmq/connect/jdbc/Config.java | 137 +++++++++++++++ .../jdbc/connector/JdbcSourceConnector.java | 83 +++++++++ .../jdbc/connector/JdbcSourceConnectorTest.java | 85 +++++++++ 4 files changed, 500 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..361dd77 --- /dev/null +++ b/pom.xml @@ -0,0 +1,195 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-jdbc</artifactId> + <version>1.0.0</version> + + <name>connect-jdbc</name> + <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + <version>2.0</version> + </dependency> + + </dependencies> + +</project> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java new file mode 100644 index 0000000..3bc3032 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; +import javax.jms.Session; + +public class Config { + @SuppressWarnings("serial") + + public String jdbcAddr; + public Integer jdbcPort; + public String jdbcUsername; + public String jdbcPassword; + + public String queueName; + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("jdbcAddr"); + add("jdbcPort"); + add("jdbcUsername"); + add("jdbcPassword"); + } + }; + +// public String destinationType; +// +// public String destinationName; +// +// public String messageSelector; +// +// private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; +// +// private Boolean sessionTransacted = Boolean.FALSE; + + public void load(KeyValue props) { + + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + //Java Reflection Application + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg = null; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + + } + } + } catch (Throwable ignored) { + } + } + } + } + + public String getJdbcAddr() { + return jdbcAddr; + } + + public void setJdbcAddr(String JdbcAddr) { + this.jdbcAddr = jdbcAddr; + } + + public int getJdbcPort() { + return jdbcPort; + } + + public void setJdbcPort(Integer jdbcPort) { + this.jdbcPort = jdbcPort; + } + + public String getJdbcUsername() { + return jdbcUsername; + } + + public void setJdbcUsername(String jdbcUsername) { + this.jdbcUsername = jdbcUsername; + } + + public String getJdbcPassword() { + return jdbcPassword; + } + + public void setJdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + } + + + + +} \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java new file mode 100644 index 0000000..d6b0e3b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.connector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; + +import org.apache.rocketmq.connect.jdbc.Config; +//import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcSourceConnector extends SourceConnector { + private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class); + private KeyValue config; + + @Override + public String verifyAndSetConfig(KeyValue config) { + System.out.println(11+config.toString()); + log.info("JdbcSourceConnector verifyAndSetConfig enter"); + for (String requestKey : Config.REQUEST_CONFIG) { + System.out.println(12+requestKey); + if (!config.containsKey(requestKey)) { + return "Request config key: " + requestKey; + } + } + this.config = config; + System.out.println(11+config.toString()); + return ""; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + @Override + public Class<? extends Task> taskClass(){ + return JdbcSourceTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + List<KeyValue> config = new ArrayList<>(); + config.add(this.config); + return config; + } + + // abstract Set<String> getRequiredConfig(); +} diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java new file mode 100644 index 0000000..79e4e59 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jdbc.connector; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rocketmq.connect.jdbc.Config; +import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; + +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.internal.DefaultKeyValue; + +public class JdbcSourceConnectorTest { + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("jdbcAddr"); + add("jdbcPort"); + add("jdbcUsername"); + add("jdbcPassword"); + } + }; + + JdbcSourceConnector connector = new JdbcSourceConnector() { + + + @Override + public Class<? extends Task> taskClass() { + return JdbcSourceTask.class; + } + + +// Set<String> getRequiredConfig() { +// return REQUEST_CONFIG; +// } + }; + + @Test + public void verifyAndSetConfigTest() { + KeyValue keyValue = new DefaultKeyValue(); + + for (String requestKey : Config.REQUEST_CONFIG) { + assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + keyValue.put(requestKey, requestKey); + } + assertEquals(connector.verifyAndSetConfig(keyValue), ""); + } + + @Test + public void taskClassTest() { + assertEquals(connector.taskClass(), JdbcSourceTask.class); + } + + @Test + public void taskConfigsTest() { + assertEquals(connector.taskConfigs().get(0), null); + KeyValue keyValue = new DefaultKeyValue(); + for (String requestKey : Config.REQUEST_CONFIG) { + keyValue.put(requestKey, requestKey); + } + connector.verifyAndSetConfig(keyValue); + assertEquals(connector.taskConfigs().get(0), keyValue); + } +}
