This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit c7c0c04a7fd28d1067ec2c8700d404ac8dd2f391 Author: laohu <[email protected]> AuthorDate: Sun Jun 2 21:10:20 2019 +0800 init complete --- pom.xml | 259 +++++++++++++++++++++ .../java/io/openmessaging/activemq/Config.java | 133 +++++++++++ .../java/io/openmessaging/activemq/Replicator.java | 72 ++++++ .../activemq/connector/ActivemqConnector.java | 72 ++++++ .../activemq/connector/ActivemqTask.java | 87 +++++++ .../activemq/pattern/PatternProcessor.java | 83 +++++++ 6 files changed, 706 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..b021330 --- /dev/null +++ b/pom.xml @@ -0,0 +1,259 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-activemq</artifactId> + <version>1.0.0</version> + + <name>connect-activemq</name> + <description>Redis Replicator is a redis RDB and Command parser written in java. + It can parse,filter,broadcast the RDB and Command events in a real time manner + and resent to Apache RocketMQ, then consumer could subscribe topic to receive data. + </description> + <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-activemq</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <developers> + <developer> + <name>Leon Chen</name> + <email>[email protected]</email> + <organization>moilioncircle</organization> + <organizationUrl>http://www.moilioncircle.com/</organizationUrl> + <roles> + <role>Developer</role> + </roles> + <timezone>+8</timezone> + </developer> + + <developer> + <name>Adrian Yao</name> + <email>[email protected]</email> + <organization>unstudy</organization> + <timezone>+8</timezone> + </developer> + + <developer> + <name>Rick Zhang</name> + <email>[email protected]</email> + <organization>treefinance.com.cn</organization> + <roles> + <role>Developer</role> + </roles> + <timezone>+8</timezone> + </developer> + </developers> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connect-runtime</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>0.3.1-alpha</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + <!-- <dependency> <groupId>io.javalin</groupId> <artifactId>javalin</artifactId> + <version>1.3.0</version> </dependency> --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-all</artifactId> + <version>5.9.0</version> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + <version>2.0</version> + </dependency> + + <dependency> + <groupId>org.glassfish.main.javaee-api</groupId> + <artifactId>javax.jms</artifactId> + <version>3.1.2.2</version> + </dependency> + + </dependencies> + +</project> diff --git a/src/main/java/io/openmessaging/activemq/Config.java b/src/main/java/io/openmessaging/activemq/Config.java new file mode 100644 index 0000000..10a5d9f --- /dev/null +++ b/src/main/java/io/openmessaging/activemq/Config.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.activemq; + +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + +import io.openmessaging.KeyValue; + +public class Config { + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("activemqUrl"); + add("activemqUsername"); + add("activemqPassword"); + add("destinationType"); + add("destinationName"); + } + }; + + public String activemqUrl; + + public String activemqUsername; + + public String activemqPassword; + + public String destinationType; + + public String destinationName; + + public void load(KeyValue props) { + + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } + + public String getActivemqUrl() { + return activemqUrl; + } + + public void setActivemqUrl(String activemqUrl) { + this.activemqUrl = activemqUrl; + } + + public String getActivemqUsername() { + return activemqUsername; + } + + public void setActivemqUsername(String activemqUsername) { + this.activemqUsername = activemqUsername; + } + + public String getActivemqPassword() { + return activemqPassword; + } + + public void setActivemqPassword(String activemqPassword) { + this.activemqPassword = activemqPassword; + } + + public String getDestinationType() { + return destinationType; + } + + public void setDestinationType(String destinationType) { + this.destinationType = destinationType; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } +} \ No newline at end of file diff --git a/src/main/java/io/openmessaging/activemq/Replicator.java b/src/main/java/io/openmessaging/activemq/Replicator.java new file mode 100644 index 0000000..51ca6c1 --- /dev/null +++ b/src/main/java/io/openmessaging/activemq/Replicator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.activemq; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.openmessaging.activemq.pattern.PatternProcessor; + +public class Replicator { + + private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); + + private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); + + private PatternProcessor processor; + + private Config config; + private Object lock = new Object(); + private BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + + public Replicator(Config config){ + this.config = config; + } + + public void start() { + + try { + processor = new PatternProcessor(this); + processor.start(); + + } catch (Exception e) { + LOGGER.error("Start error.", e); + } + } + + public void stop(){ + processor.stop(); + } + + public void commit(Message message, boolean isComplete) { + queue.add(message); + } + + public Config getConfig() { + return this.config; + } + + public BlockingQueue<Message> getQueue() { + return queue; + } +} diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java new file mode 100644 index 0000000..1c9a530 --- /dev/null +++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqConnector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.activemq.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.activemq.Config; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; +import java.util.ArrayList; +import java.util.List; + +public class ActivemqConnector extends SourceConnector { + + private KeyValue config; + + @Override + public String verifyAndSetConfig(KeyValue config) { + + for(String requestKey : Config.REQUEST_CONFIG){ + if(!config.containsKey(requestKey)){ + return "Request config key: " + requestKey; + } + } + this.config = config; + return ""; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + @Override + public Class<? extends Task> taskClass() { + return ActivemqTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + List<KeyValue> config = new ArrayList<>(); + config.add(this.config); + return config; + } +} diff --git a/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java new file mode 100644 index 0000000..a04fc50 --- /dev/null +++ b/src/main/java/io/openmessaging/activemq/connector/ActivemqTask.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.activemq.connector; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.jms.Message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSON; + +import io.openmessaging.KeyValue; +import io.openmessaging.activemq.Config; +import io.openmessaging.activemq.Replicator; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.source.SourceTask; + +public class ActivemqTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(ActivemqTask.class); + + private Replicator replicator; + + private Config config; + + @Override + public Collection<SourceDataEntry> poll() { + + List<SourceDataEntry> res = new ArrayList<>(); + + try { + Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); + SourceDataEntry sourceDataEntry = null; + + res.add(sourceDataEntry); + } catch (Exception e) { + log.error("Mysql task poll error, current config:" + JSON.toJSONString(config), e); + } + return res; + } + + @Override + public void start(KeyValue props) { + + try { + this.config = new Config(); + this.config.load(props); + this.replicator = new Replicator(config); + } catch (Exception e) { + log.error("Mysql task start failed.", e); + } + this.replicator.start(); + } + + @Override + public void stop() { + replicator.stop(); + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } +} diff --git a/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java new file mode 100644 index 0000000..4b4b450 --- /dev/null +++ b/src/main/java/io/openmessaging/activemq/pattern/PatternProcessor.java @@ -0,0 +1,83 @@ +package io.openmessaging.activemq.pattern; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.commons.lang3.StringUtils; + +import io.openmessaging.activemq.Config; +import io.openmessaging.activemq.Replicator; + +public class PatternProcessor { + + private Replicator replicator; + + private Config config; + + Connection connection; + + Session session; + + MessageConsumer consumer; + + public PatternProcessor(Replicator replicator) { + this.replicator = replicator; + this.config = replicator.getConfig(); + } + + public void start() { + try { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); + + //2、使用连接工厂创建一个连接对象 + if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) { + connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword()); + }else { + connection = connectionFactory.createConnection(); + } + //3、开启连接 + connection.start(); + //4、使用连接对象创建会话(session)对象 + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) + Destination destination = null; + if(StringUtils.equals("topic", config.getDestinationType())) { + destination = session.createTopic(config.getDestinationName()); + }else if(StringUtils.equals("queue", config.getDestinationType())){ + destination = session.createQueue(config.getDestinationName()); + }else { + throw new RuntimeException(""); + } + consumer = session.createConsumer(destination); + //6、使用会话对象创建生产者对象 + //7、向consumer对象中设置一个messageListener对象,用来接收消息 + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + replicator.commit(message, true); + } + }); + }catch(Exception e) { + + } + } + + public void stop() { + try { + consumer.close(); + session.close(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + +}
