This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 78cc90208215b2f9a9998149be968a7b76575d36 Author: githublaohu <[email protected]> AuthorDate: Fri Jun 21 15:38:21 2019 +0800 [ISSUE #302] Implement rocketmq connect jms (#303) --- README-CN.md | 16 ++ README.md | 15 ++ pom.xml | 201 +++++++++++++++++++++ .../org/apache/rocketmq/connect/jms/Config.java | 165 +++++++++++++++++ .../org/apache/rocketmq/connect/jms/ErrorCode.java | 8 + .../apache/rocketmq/connect/jms/Replicator.java | 67 +++++++ .../jms/connector/BaseJmsSourceConnector.java | 73 ++++++++ .../connect/jms/connector/BaseJmsSourceTask.java | 148 +++++++++++++++ .../connect/jms/pattern/PatternProcessor.java | 90 +++++++++ .../rocketmq/connect/jms/ReplicatorTest.java | 74 ++++++++ .../jms/connector/ActivemqSourceTaskTest.java | 164 +++++++++++++++++ .../jms/connector/BaseJmsSourceConnectorTest.java | 82 +++++++++ 12 files changed, 1103 insertions(+) diff --git a/README-CN.md b/README-CN.md new file mode 100644 index 0000000..be03683 --- /dev/null +++ b/README-CN.md @@ -0,0 +1,16 @@ +##### ActiveConnector完全限定名 +org.apache.rocketmq.connect.activemq.connector.ActivemqConnector + + +##### 配置参数 + +参数 | 作用 | 是否必填 | 默认值 +---|--- |--- | --- +activemq.url | activemq ip与端口号 | 是 | 无 +activemq.username | 用户名 | 否 | 无 +activemq.password| 密码 | 否 | 无 +jms.destination.name | 读取的队列或者主题名 | 是 | 无 +jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无 +jms.message.selector | 过滤器 | 否 |无 +jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE +jms.session.transacted | 是否是事务会话 | 否 | false diff --git a/README.md b/README.md index 8b13789..e15149e 100644 --- a/README.md +++ b/README.md @@ -1 +1,16 @@ +##### ActiveConnector fully-qualified name +org.apache.rocketmq.connect.activemq.connector.ActivemqConnector + +##### parameter configuration + +parameter | effect | required |default +---|--- |--- | --- +activemq.url | The URL of the ActiveMQ broker | yes | null +activemq.username | The username to use when connecting to ActiveMQ | no | null +activemq.password| The password to use when connecting to ActiveMQ | no | null +jms.destination.name | The name of the JMS destination (queue or topic) to read from | yes | null +jms.destination.type | The type of JMS destination, which is either queue or topic | yes | null +jms.message.selector | The message selector that should be applied to messages in the destination | no | null +jms.session.acknowledge.mode | The acknowledgement mode for the JMS Session | null | Session.AUTO_ACKNOWLEDGE +jms.session.transacted | Flag to determine if the session is transacted and the session completely controls. the message delivery by either committing or rolling back the session | null | false diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c7c3ba2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,201 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-jms</artifactId> + <version>1.0.0</version> + + <name>connect-jms</name> + <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jms</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-all</artifactId> + <version>5.9.0</version> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + <version>2.0</version> + </dependency> + + </dependencies> + +</project> diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Config.java b/src/main/java/org/apache/rocketmq/connect/jms/Config.java new file mode 100644 index 0000000..e9cd178 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/Config.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; +import javax.jms.Session; + +public class Config { + + @SuppressWarnings("serial") + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("brokerUrl"); + add("destinationType"); + add("destinationName"); + } + }; + + public String brokerUrl; + + public String username; + + public String password; + + public String destinationType; + + public String destinationName; + + public String messageSelector; + + private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + private Boolean sessionTransacted = Boolean.FALSE; + + public void load(KeyValue props) { + + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } + + + + public String getBrokerUrl() { + return brokerUrl; + } + + public void setBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDestinationType() { + return destinationType; + } + + public void setDestinationType(String destinationType) { + this.destinationType = destinationType; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public Integer getSessionAcknowledgeMode() { + return sessionAcknowledgeMode; + } + + public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) { + this.sessionAcknowledgeMode = sessionAcknowledgeMode; + } + + public Boolean getSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(Boolean sessionTransacted) { + this.sessionTransacted = sessionTransacted; + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java new file mode 100644 index 0000000..b6dafb1 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/ErrorCode.java @@ -0,0 +1,8 @@ +package org.apache.rocketmq.connect.jms; + +public class ErrorCode { + + public static final int START_ERROR_CODE = 10001; + + public static final int STOP_ERROR_CODE = 10002; +} diff --git a/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java new file mode 100644 index 0000000..3e859a4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/Replicator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Message; + +import org.apache.rocketmq.connect.jms.connector.BaseJmsSourceTask; +import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Replicator { + + private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); + + private PatternProcessor processor; + + private Config config; + private BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + + private BaseJmsSourceTask baseJmsSourceTask; + + public Replicator(Config config , BaseJmsSourceTask baseJmsSourceTask) { + this.config = config; + this.baseJmsSourceTask = baseJmsSourceTask; + } + + public void start() throws Exception { + processor = baseJmsSourceTask.getPatternProcessor(this); + processor.start(); + LOGGER.info("Replicator start succeed"); + } + + public void stop() throws Exception { + processor.stop(); + } + + public void commit(Message message, boolean isComplete) { + queue.add(message); + } + + public Config getConfig() { + return this.config; + } + + public BlockingQueue<Message> getQueue() { + return queue; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java new file mode 100644 index 0000000..f939881 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnector.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms.connector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; + +public abstract class BaseJmsSourceConnector extends SourceConnector { + + private KeyValue config; + + @Override + public String verifyAndSetConfig(KeyValue config) { + + for (String requestKey : getRequiredConfig()) { + if (!config.containsKey(requestKey)) { + return "Request config key: " + requestKey; + } + } + this.config = config; + return ""; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + @Override + public abstract Class<? extends Task> taskClass(); + + @Override + public List<KeyValue> taskConfigs() { + List<KeyValue> config = new ArrayList<>(); + config.add(this.config); + return config; + } + + abstract Set<String> getRequiredConfig(); +} diff --git a/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java new file mode 100644 index 0000000..f43e7fb --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms.connector; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.rocketmq.connect.jms.Config; +import org.apache.rocketmq.connect.jms.ErrorCode; +import org.apache.rocketmq.connect.jms.Replicator; +import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSON; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.exception.DataConnectException; +import io.openmessaging.connector.api.source.SourceTask; + +public abstract class BaseJmsSourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(BaseJmsSourceTask.class); + + private Replicator replicator; + + private Config config; + + private ByteBuffer sourcePartition; + + @Override + public Collection<SourceDataEntry> poll() { + List<SourceDataEntry> res = new ArrayList<>(); + try { + Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); + if (message != null) { + Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageContent(message)}; + SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, null, System.currentTimeMillis(), EntryType.CREATE, null, null, payload); + res.add(sourceDataEntry); + } + } catch (Exception e) { + log.error("activemq task poll error, current config:" + JSON.toJSONString(config), e); + } + return res; + } + + @Override + public void start(KeyValue props) { + try { + this.config = new Config(); + this.config.load(props); + this.sourcePartition = ByteBuffer.wrap(config.getBrokerUrl().getBytes("UTF-8")); + this.replicator = new Replicator(config,this); + this.replicator.start(); + } catch (Exception e) { + log.error("activemq task start failed.", e); + throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e); + } + } + + @Override + public void stop() { + try { + replicator.stop(); + } catch (Exception e) { + log.error("activemq task stop failed.", e); + throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e); + } + } + + @Override public void pause() { + + } + + @Override public void resume() { + + } + + @SuppressWarnings("unchecked") + public ByteBuffer getMessageContent(Message message) throws JMSException { + byte[] data = null; + if (message instanceof TextMessage) { + data = ((TextMessage) message).getText().getBytes(); + } else if (message instanceof ObjectMessage) { + data = JSON.toJSONBytes(((ObjectMessage) message).getObject()); + } else if (message instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage) message; + data = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(data); + } else if (message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage) message; + Map<String, Object> map = new HashMap<>(); + Enumeration<Object> names = mapMessage.getMapNames(); + while (names.hasMoreElements()) { + String name = names.nextElement().toString(); + map.put(name, mapMessage.getObject(name)); + } + data = JSON.toJSONBytes(map); + } else if (message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage) message; + ByteArrayOutputStream bis = new ByteArrayOutputStream(); + byte[] by = new byte[1024]; + int i = 0; + while ((i = streamMessage.readBytes(by)) != -1) { + bis.write(by, 0, i); + } + data = bis.toByteArray(); + } else { + // The exception is printed and does not need to be written as a DataConnectException + throw new RuntimeException("message type exception"); + } + return ByteBuffer.wrap(data); + } + + public abstract PatternProcessor getPatternProcessor(Replicator replicator); +} diff --git a/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java new file mode 100644 index 0000000..e03691f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jms/pattern/PatternProcessor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms.pattern; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.jms.Config; +import org.apache.rocketmq.connect.jms.Replicator; + +public abstract class PatternProcessor { + + private Replicator replicator; + + Config config; + + private Connection connection; + + private Session session; + + private MessageConsumer consumer; + + public PatternProcessor(Replicator replicator) { + this.replicator = replicator; + this.config = replicator.getConfig(); + } + + public abstract ConnectionFactory connectionFactory(); + + public void start() throws Exception { + if (!StringUtils.equals("topic", config.getDestinationType()) + && !StringUtils.equals("queue", config.getDestinationType())) { + // RuntimeException is caught by DataConnectException + throw new RuntimeException("destination type is incorrectness"); + } + + ConnectionFactory connectionFactory = connectionFactory(); + + if (StringUtils.isNotBlank(config.getUsername()) + && StringUtils.isNotBlank(config.getPassword())) { + connection = connectionFactory.createConnection(config.getUsername(), config.getPassword()); + } else { + connection = connectionFactory.createConnection(); + } + connection.start(); + Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); + Destination destination = null; + if (StringUtils.equals("topic", config.getDestinationType())) { + destination = session.createTopic(config.getDestinationName()); + } else if (StringUtils.equals("queue", config.getDestinationType())) { + destination = session.createQueue(config.getDestinationName()); + } + consumer = session.createConsumer(destination, config.getMessageSelector()); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + replicator.commit(message, true); + } + }); + + } + + public void stop() throws Exception { + consumer.close(); + session.close(); + connection.close(); + } + +} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java new file mode 100644 index 0000000..32b8f04 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jms/ReplicatorTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms; + +import java.lang.reflect.Field; + +import javax.jms.Message; + +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.rocketmq.connect.jms.pattern.PatternProcessor; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import org.junit.Assert; + +public class ReplicatorTest { + + Replicator replicator; + + PatternProcessor patternProcessor; + + Config config; + + @Before + public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { + config = new Config(); + replicator = new Replicator(config,null); + + patternProcessor = Mockito.mock(PatternProcessor.class); + + Field processor = Replicator.class.getDeclaredField("processor"); + processor.setAccessible(true); + processor.set(replicator, patternProcessor); + } + + @Test(expected = RuntimeException.class) + public void startTest() throws Exception { + replicator.start(); + } + + @Test + public void stop() throws Exception { + replicator.stop(); + Mockito.verify(patternProcessor, Mockito.times(1)).stop(); + } + + @Test + public void commitAddGetQueueTest() { + Message message = new ActiveMQTextMessage(); + replicator.commit(message, false); + Assert.assertEquals(replicator.getQueue().poll(), message); + } + + @Test + public void getConfigTest() { + Assert.assertEquals(replicator.getConfig(), config); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java new file mode 100644 index 0000000..72e818a --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/ActivemqSourceTaskTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms.connector; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.rocketmq.connect.jms.Config; +import org.apache.rocketmq.connect.jms.Replicator; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.alibaba.fastjson.JSON; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.internal.DefaultKeyValue; + +public class ActivemqSourceTaskTest { + + public void befores() throws JMSException, InterruptedException { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://112.74.48.251:6166"); + Connection connection = connectionFactory.createConnection(); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("test-queue"); + + MessageProducer producer = session.createProducer(destination); + + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < 20; i++) { + TextMessage message = session.createTextMessage("hello 我是消息:" + i); + producer.send(message); + } + + session.commit(); + session.close(); + connection.close(); + } + + //@Test + public void test() throws InterruptedException { + KeyValue kv = new DefaultKeyValue(); + kv.put("activemqUrl", "tcp://112.74.48.251:6166"); + kv.put("destinationType", "queue"); + kv.put("destinationName", "test-queue"); + ActivemqSourceTask task = new ActivemqSourceTask(); + task.start(kv); + for (int i = 0; i < 20; ) { + Collection<SourceDataEntry> sourceDataEntry = task.poll(); + i = i + sourceDataEntry.size(); + System.out.println(sourceDataEntry); + } + Thread.sleep(20000); + } + + @Test + public void pollTest() throws Exception { + ActivemqSourceTask task = new ActivemqSourceTask(); + TextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText("hello rocketmq"); + + Replicator replicatorObject = Mockito.mock(Replicator.class); + BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + Mockito.when(replicatorObject.getQueue()).thenReturn(queue); + + Field replicator = ActivemqSourceTask.class.getDeclaredField("replicator"); + replicator.setAccessible(true); + replicator.set(task, replicatorObject); + + Field config = ActivemqSourceTask.class.getDeclaredField("config"); + config.setAccessible(true); + config.set(task, new Config()); + + queue.put(textMessage); + Collection<SourceDataEntry> list = task.poll(); + Assert.assertEquals(list.size(), 1); + + list = task.poll(); + Assert.assertEquals(list.size(), 0); + + } + + @Test(expected = RuntimeException.class) + public void getMessageConnentTest() throws JMSException { + String value = "hello rocketmq"; + ActivemqSourceTask task = new ActivemqSourceTask(); + TextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText(value); + ByteBuffer buffer = task.getMessageConnent(textMessage); + Assert.assertEquals(new String(buffer.array()), textMessage.getText()); + + ObjectMessage objectMessage = new ActiveMQObjectMessage(); + objectMessage.setObject(value); + buffer = task.getMessageConnent(objectMessage); + Assert.assertEquals(new String(buffer.array()), "\"" + objectMessage.getObject().toString() + "\""); + + BytesMessage bytes = new ActiveMQBytesMessage(); + bytes.writeBytes(value.getBytes()); + bytes.reset(); + buffer = task.getMessageConnent(bytes); + Assert.assertEquals(new String(buffer.array()), value); + + MapMessage mapMessage = new ActiveMQMapMessage(); + mapMessage.setString("hello", "rocketmq"); + buffer = task.getMessageConnent(mapMessage); + Map<String, String> map = JSON.parseObject(buffer.array(), Map.class); + Assert.assertEquals(map.get("hello"), "rocketmq"); + Assert.assertEquals(map.size(), 1); + + StreamMessage streamMessage = new ActiveMQStreamMessage(); + String valueTwo = null; + for (int i = 0; i < 200; i++) { + valueTwo = valueTwo + value; + } + streamMessage.writeBytes(valueTwo.getBytes()); + streamMessage.reset(); + buffer = task.getMessageConnent(streamMessage); + Assert.assertEquals(new String(buffer.array()), valueTwo); + + task.getMessageConnent(null); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java new file mode 100644 index 0000000..6c4029a --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceConnectorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.jms.connector; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rocketmq.connect.jms.Config; +import org.junit.Test; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.internal.DefaultKeyValue; + +public class BaseJmsSourceConnectorTest { + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("activemqUrl"); + add("destinationType"); + add("destinationName"); + } + }; + + BaseJmsSourceConnector connector = new BaseJmsSourceConnector() { + + + @Override + public Class<? extends Task> taskClass() { + return BaseJmsSourceTask.class; + } + + @Override + Set<String> getRequiredConfig() { + return REQUEST_CONFIG; + } + }; + + @Test + public void verifyAndSetConfigTest() { + KeyValue keyValue = new DefaultKeyValue(); + + for (String requestKey : Config.REQUEST_CONFIG) { + assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + keyValue.put(requestKey, requestKey); + } + assertEquals(connector.verifyAndSetConfig(keyValue), ""); + } + + @Test + public void taskClassTest() { + assertEquals(connector.taskClass(), BaseJmsSourceTask.class); + } + + @Test + public void taskConfigsTest() { + assertEquals(connector.taskConfigs().get(0), null); + KeyValue keyValue = new DefaultKeyValue(); + for (String requestKey : Config.REQUEST_CONFIG) { + keyValue.put(requestKey, requestKey); + } + connector.verifyAndSetConfig(keyValue); + assertEquals(connector.taskConfigs().get(0), keyValue); + } +}
