Release rocketmq-jms 1.0.0 version
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/c4b20122 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/c4b20122 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/c4b20122 Branch: refs/heads/release-rocketmq-jms-1.0.0 Commit: c4b20122a5f75d48167220155efb56eccd308023 Parents: Author: yukon <[email protected]> Authored: Thu Jun 15 10:40:04 2017 +0800 Committer: yukon <[email protected]> Committed: Thu Jun 15 10:41:16 2017 +0800 ---------------------------------------------------------------------- .gitignore | 1 + README.md | 32 ++ rocketmq-jms/.gitignore | 5 + rocketmq-jms/.travis.yml | 43 ++ rocketmq-jms/README.md | 31 ++ rocketmq-jms/core/pom.xml | 48 ++ .../rocketmq/jms/domain/CommonConstant.java | 36 ++ .../rocketmq/jms/domain/CommonContext.java | 183 ++++++++ .../rocketmq/jms/domain/JmsBaseConnection.java | 172 ++++++++ .../jms/domain/JmsBaseConnectionFactory.java | 146 +++++++ .../jms/domain/JmsBaseConnectionMetaData.java | 134 ++++++ .../rocketmq/jms/domain/JmsBaseConstant.java | 86 ++++ .../jms/domain/JmsBaseMessageConsumer.java | 168 +++++++ .../jms/domain/JmsBaseMessageProducer.java | 281 ++++++++++++ .../rocketmq/jms/domain/JmsBaseSession.java | 308 +++++++++++++ .../rocketmq/jms/domain/JmsBaseTopic.java | 53 +++ .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ++++++ .../jms/domain/message/JmsBaseMessage.java | 434 +++++++++++++++++++ .../jms/domain/message/JmsBytesMessage.java | 245 +++++++++++ .../jms/domain/message/JmsObjectMessage.java | 41 ++ .../jms/domain/message/JmsTextMessage.java | 48 ++ .../apache/rocketmq/jms/util/ExceptionUtil.java | 41 ++ .../rocketmq/jms/util/MessageConverter.java | 182 ++++++++ .../rocketmq/jms/util/MsgConvertUtil.java | 90 ++++ .../apache/rocketmq/jms/util/URISpecParser.java | 61 +++ .../core/src/main/resources/application.conf | 1 + .../apache/rocketmq/jms/JmsTestListener.java | 67 +++ .../org/apache/rocketmq/jms/JmsTestUtil.java | 54 +++ .../jms/domain/message/JmsBytesMessageTest.java | 103 +++++ .../domain/message/JmsMessageConvertTest.java | 52 +++ .../domain/message/JmsObjectMessageTest.java | 92 ++++ .../jms/domain/message/JmsTextMessageTest.java | 50 +++ .../jms/integration/IntegrationTestBase.java | 199 +++++++++ .../rocketmq/jms/integration/JmsClientIT.java | 191 ++++++++ .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ++++++ .../rocketmq/jms/util/URISpecParserTest.java | 43 ++ rocketmq-jms/pom.xml | 196 +++++++++ rocketmq-jms/spring/pom.xml | 82 ++++ .../SimpleExMessageListenerContainer.java | 90 ++++ .../rocketmq/jms/spring/JmsConsumeIT.java | 61 +++ .../rocketmq/jms/spring/JmsProduceIT.java | 93 ++++ .../rocketmq/jms/spring/SpringTestBase.java | 41 ++ .../spring/src/test/resources/consumer.xml | 51 +++ .../spring/src/test/resources/producer.xml | 43 ++ rocketmq-jms/style/copyright/Apache.xml | 24 + .../style/copyright/profiles_settings.xml | 64 +++ rocketmq-jms/style/rmq_checkstyle.xml | 135 ++++++ rocketmq-jms/style/rmq_codeStyle.xml | 157 +++++++ 48 files changed, 5017 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md new file mode 100644 index 0000000..07a5fa6 --- /dev/null +++ b/README.md @@ -0,0 +1,32 @@ +# RocketMQ Externals + +There are some RocketMQ external projects, with the purpose of growing the RocketMQ community. + +## RocketMQ-Console-Ng +A console for RocketMQ + +## RocketMQ-JMS +RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target. + +## RocketMQ-Flume-Ng + +This project is used to receive and send messages between +[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume) + +1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume). +2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository. +3. Execute the following command in rocketmq-flume root directory + + `mvn clean install dependency:copy-dependencies` + +4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later) + + +## RocketMQ-Spark + +Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided. +For more details please refer to rocketmq-spark README.md. + +## RocketMQ-Docker +Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image. + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore new file mode 100644 index 0000000..d2e5aaf --- /dev/null +++ b/rocketmq-jms/.gitignore @@ -0,0 +1,5 @@ +.idea/ +*.iml +*.ipr +*.iws +target/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/.travis.yml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml new file mode 100644 index 0000000..9f430b2 --- /dev/null +++ b/rocketmq-jms/.travis.yml @@ -0,0 +1,43 @@ +notifications: + email: + recipients: + - [email protected] + - [email protected] + on_success: change + on_failure: always + +language: java + +matrix: + include: + # On OSX, run with default JDK only. + # - os: osx + # On Linux, run with specific JDKs only. + # - os: linux + # env: CUSTOM_JDK="oraclejdk8" + - os: linux + env: CUSTOM_JDK="oraclejdk7" + #- os: linux + # env: CUSTOM_JDK="openjdk7" + +before_install: + - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc + - cat ~/.mavenrc + - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi + - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi + +#os: +# - linux +# - osx +#jdk: +# - oraclejdk8 +# - oraclejdk7 +# - openjdk7 + + +script: + - travis_retry mvn -B clean install jacoco:report coveralls:report + +#after_success: +# - mvn clean install +# - mvn sonar:sonar http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md new file mode 100644 index 0000000..a05e27e --- /dev/null +++ b/rocketmq-jms/README.md @@ -0,0 +1,31 @@ +# RocketMQ-JMS [](https://travis-ci.org/rocketmq/rocketmq-jms) [](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master) + + +## Introduction +RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. +Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target. + +Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1". +Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version. + + +## Building + + > cd rocketmq-jms + > mvn clean install + + **run unit test:** + > mvn test + + **run integration test:** + > mvn verify + + **see jacoco code coverage report** + > open core/target/site/jacoco/index.html + > open core/target/site/jacoco-it/index.html + > open spring/target/site/jacoco-it/index.html + + +## Guidelines + + Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml new file mode 100644 index 0000000..1b36e14 --- /dev/null +++ b/rocketmq-jms/core/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-jms-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-jms</artifactId> + <version>1.0-SNAPSHOT</version> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java new file mode 100644 index 0000000..80a8b64 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +public interface CommonConstant { + + String PRODUCERID = "producerId"; + + String CONSUMERID = "consumerId"; + + String PROVIDER = "provider"; + + String NAMESERVER = "nameServer"; + + String INSTANCE_NAME = "instanceName"; + + String CONSUME_THREAD_NUMS = "consumeThreadNums"; + + String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis"; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java new file mode 100644 index 0000000..c8e4276 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +public class CommonContext { + private String accessKey; + private String secretKey; + + private String consumerId; + private String producerId; + + private String provider; + + private String appId; + + private String nameServer; + + /** + * MQType + */ + private String mqType; + + /** + * Using for distinguishing client jvm process + */ + private String instanceName; + /** + * Set consumer threadPool Size + */ + private int consumeThreadNums; + /** + * Set send message timeOut + */ + private int sendMsgTimeoutMillis = -1; + + /** + * @return the appId + */ + public String getAppId() { + return appId; + } + + /** + * @param appId the appId to set + */ + public void setAppId(String appId) { + this.appId = appId; + } + + /** + * @return the provider + */ + public String getProvider() { + return provider; + } + + /** + * @param provider the provider to set + */ + public void setProvider(String provider) { + this.provider = provider; + } + + /** + * @return the instanceName + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @param instanceName the instanceName to set + */ + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + /** + * @return the accessKey + */ + public String getAccessKey() { + return accessKey; + } + + /** + * @param accessKey the accessKey to set + */ + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + /** + * @return the secretKey + */ + public String getSecretKey() { + return secretKey; + } + + /** + * @param secretKey the secretKey to set + */ + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + /** + * @return consumer thread nums + */ + public int getConsumeThreadNums() { + return consumeThreadNums; + } + + /** + * @param consumeThreadNums + */ + public void setConsumeThreadNums(int consumeThreadNums) { + this.consumeThreadNums = consumeThreadNums; + } + + public String getConsumerId() { + return consumerId; + } + + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + + public String getProducerId() { + return producerId; + } + + public void setProducerId(String producerId) { + this.producerId = producerId; + } + + public int getSendMsgTimeoutMillis() { + return sendMsgTimeoutMillis; + } + + public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) { + this.sendMsgTimeoutMillis = sendMsgTimeoutMillis; + } + + public String getMqType() { + return mqType; + } + + public void setMqType(String mqType) { + this.mqType = mqType; + } + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java new file mode 100644 index 0000000..4c809c7 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.commons.lang.StringUtils; + +public class JmsBaseConnection implements Connection { + private final AtomicBoolean started = new AtomicBoolean(false); + protected String clientID; + protected ExceptionListener exceptionListener; + protected CommonContext context; + protected JmsBaseSession session; + + public JmsBaseConnection(Map<String, String> connectionParams) { + + this.clientID = UUID.randomUUID().toString(); + + context = new CommonContext(); + + //At lease one should be set + context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID)); + context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID)); + + //optional + context.setProvider(connectionParams.get(CommonConstant.PROVIDER)); + + String nameServer = connectionParams.get(CommonConstant.NAMESERVER); + String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS); + String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS); + String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME); + + if (StringUtils.isNotEmpty(nameServer)) { + context.setNameServer(nameServer); + } + if (StringUtils.isNotEmpty(instanceName)) { + context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME)); + } + + if (StringUtils.isNotEmpty(consumerThreadNums)) { + context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums)); + } + if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) { + context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis)); + } + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + + Preconditions.checkArgument(!transacted, "Not support transaction Session !"); + Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode, + "Not support this acknowledge mode: " + acknowledgeMode); + + if (null != this.session) { + return this.session; + } + synchronized (this) { + if (null != this.session) { + return this.session; + } + this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context); + if (isStarted()) { + this.session.start(); + } + return this.session; + } + } + + @Override + public String getClientID() throws JMSException { + return this.clientID; + } + + @Override + public void setClientID(String clientID) throws JMSException { + this.clientID = clientID; + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return new JmsBaseConnectionMetaData(); + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + return this.exceptionListener; + } + + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + this.exceptionListener = listener; + } + + @Override + public void start() throws JMSException { + if (started.compareAndSet(false, true)) { + if (this.session != null) { + this.session.start(); + } + + } + } + + @Override + public void stop() throws JMSException { + //Stop the connection before closing it. + //Do nothing here. + } + + @Override + public void close() throws JMSException { + if (started.compareAndSet(true, false)) { + if (this.session != null) { + this.session.close(); + } + + } + } + + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Whether the connection is started. + * + * @return whether the connection is started. + */ + public boolean isStarted() { + return started.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java new file mode 100644 index 0000000..1b9da06 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.util.URISpecParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsBaseConnectionFactory implements ConnectionFactory { + + private static Logger logger = LoggerFactory + .getLogger(JmsBaseConnectionFactory.class); + /** + * Synchronization monitor for the shared Connection + */ + private final Object connectionMonitor = new Object(); + /** + * Can be configured in a consistent way without too much URL hacking. + */ + protected URI connectionUri; + /** + * Store connection uri query parameters. + */ + protected Map<String, String> connectionParams; + /** + * Wrapped Connection + */ + protected JmsBaseConnection connection; + + public JmsBaseConnectionFactory() { + + } + + public JmsBaseConnectionFactory(URI connectionUri) { + setConnectionUri(connectionUri); + } + + public void setConnectionUri(URI connectionUri) { + Preconditions.checkNotNull(connectionUri, "Please set URI !"); + this.connectionUri = connectionUri; + this.connectionParams = URISpecParser.parseURI(connectionUri.toString()); + + if (null != connectionParams) { + Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) || + null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !"); + } + + } + + @Override + public Connection createConnection() throws JMSException { + synchronized (this.connectionMonitor) { + if (this.connection == null) { + initConnection(); + } + return this.connection; + } + } + + /** + * Using userName and Password to create a connection + * + * @param userName ignored + * @param password ignored + * @return the new JMS Connection + * @throws JMSException + */ + @Override + public Connection createConnection(String userName, String password) throws JMSException { + logger.debug("Using userName and Password to create a connection."); + return this.createConnection(); + } + + /** + * Initialize the underlying shared Connection. + * <p/> + * Closes and reInitializes the Connection if an underlying Connection is present already. + * + * @throws javax.jms.JMSException if thrown by JMS API methods + */ + protected void initConnection() throws JMSException { + synchronized (this.connectionMonitor) { + if (this.connection != null) { + closeConnection(this.connection); + } + this.connection = doCreateConnection(); + logger.debug("Established shared JMS Connection: {}", this.connection); + } + } + + /** + * Close the given Connection. + * + * @param con the Connection to close + */ + protected void closeConnection(Connection con) { + logger.debug("Closing shared JMS Connection: {}", this.connection); + try { + try { + con.stop(); + } + finally { + con.close(); + } + } + catch (Throwable ex) { + logger.error("Could not close shared JMS Connection.", ex); + } + } + + /** + * Create a JMS Connection + * + * @return the new JMS Connection + * @throws javax.jms.JMSException if thrown by JMS API methods + */ + protected JmsBaseConnection doCreateConnection() throws JMSException { + Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0, + "Connection Parameters can not be null!"); + this.connection = new JmsBaseConnection(this.connectionParams); + + return connection; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java new file mode 100644 index 0000000..ee549aa --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.Properties; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +public class JmsBaseConnectionMetaData implements ConnectionMetaData { + public static final String JMS_VERSION; + public static final int JMS_MAJOR_VERSION; + public static final int JMS_MINOR_VERSION; + + public static final String PROVIDER_VERSION; + public static final int PROVIDER_MAJOR_VERSION; + public static final int PROVIDER_MINOR_VERSION; + + public static final String PROVIDER_NAME = "Apache RocketMQ"; + + public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData(); + + public static InputStream resourceStream; + + static { + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + + String jmsVersion = null; + int jmsMajor = 0; + int jmsMinor = 0; + try { + Package p = Package.getPackage("javax.jms"); + if (p != null) { + jmsVersion = p.getImplementationVersion(); + Matcher m = pattern.matcher(jmsVersion); + if (m.matches()) { + jmsMajor = Integer.parseInt(m.group(1)); + jmsMinor = Integer.parseInt(m.group(2)); + } + } + } + catch (Throwable e) { + } + JMS_VERSION = jmsVersion; + JMS_MAJOR_VERSION = jmsMajor; + JMS_MINOR_VERSION = jmsMinor; + + String providerVersion = null; + int providerMajor = 0; + int providerMinor = 0; + Properties properties = new Properties(); + try { + resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf"); + properties.load(resourceStream); + providerVersion = properties.getProperty("version"); + + Matcher m = pattern.matcher(providerVersion); + if (m.matches()) { + providerMajor = Integer.parseInt(m.group(1)); + providerMinor = Integer.parseInt(m.group(2)); + } + } + catch (IOException e) { + e.printStackTrace(); + } + PROVIDER_VERSION = providerVersion; + PROVIDER_MAJOR_VERSION = providerMajor; + PROVIDER_MINOR_VERSION = providerMinor; + + } + + public String getJMSVersion() throws JMSException { + return JMS_VERSION; + } + + public int getJMSMajorVersion() throws JMSException { + return JMS_MAJOR_VERSION; + } + + public int getJMSMinorVersion() throws JMSException { + return JMS_MINOR_VERSION; + } + + public String getJMSProviderName() throws JMSException { + return PROVIDER_NAME; + } + + public String getProviderVersion() throws JMSException { + return PROVIDER_VERSION; + } + + public int getProviderMajorVersion() throws JMSException { + return PROVIDER_MAJOR_VERSION; + } + + public int getProviderMinorVersion() throws JMSException { + return PROVIDER_MINOR_VERSION; + } + + public Enumeration<?> getJMSXPropertyNames() throws JMSException { + Vector<String> jmxProperties = new Vector<String>(); + jmxProperties.add("jmsXUserId"); + jmxProperties.add("jmsXAppId"); + jmxProperties.add("jmsXGroupID"); + jmxProperties.add("jmsXGroupSeq"); + jmxProperties.add("jmsXState"); + jmxProperties.add("jmsXDeliveryCount"); + jmxProperties.add("jmsXProducerTXID"); + jmxProperties.add("jmsConsumerTXID"); + jmxProperties.add("jmsRecvTimeStamp"); + return jmxProperties.elements(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java new file mode 100644 index 0000000..f0bca28 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +public interface JmsBaseConstant { + //------------------------JMS message header constant--------------------------------- + String JMS_DESTINATION = "jmsDestination"; + String JMS_DELIVERY_MODE = "jmsDeliveryMode"; + String JMS_EXPIRATION = "jmsExpiration"; + String JMS_DELIVERY_TIME = "jmsDeliveryTime"; + String JMS_PRIORITY = "jmsPriority"; + String JMS_MESSAGE_ID = "jmsMessageID"; + String JMS_TIMESTAMP = "jmsTimestamp"; + String JMS_CORRELATION_ID = "jmsCorrelationID"; + String JMS_REPLY_TO = "jmsReplyTo"; + String JMS_TYPE = "jmsType"; + String JMS_REDELIVERED = "jmsRedelivered"; + + //-------------------------JMS defined properties constant---------------------------- + /** + * The identity of the user sending the Send message + */ + String JMS_XUSER_ID = "jmsXUserID"; + /** + * The identity of the application Send sending the message + */ + String JMS_XAPP_ID = "jmsXAppID"; + /** + * The number of message delivery Receive attempts + */ + String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount"; + /** + * The identity of the message group this message is part of + */ + String JMS_XGROUP_ID = "jmsXGroupID"; + /** + * The sequence number of this message within the group; the first message is 1, the second 2,... + */ + String JMS_XGROUP_SEQ = "jmsXGroupSeq"; + /** + * The transaction identifier of the Send transaction within which this message was produced + */ + String JMS_XPRODUCER_TXID = "jmsXProducerTXID"; + /** + * The transaction identifier of the Receive transaction within which this message was consumed + */ + String JMS_XCONSUMER_TXID = "jmsXConsumerTXID"; + + /** + * The time JMS delivered the Receive message to the consumer + */ + String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp"; + /** + * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and + * that these copies exist from the time the original message was sent. Each copyâs state is one of: 1(waiting), + * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided + * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this. + */ + String JMS_XSTATE = "jmsXState"; + + //---------------------------JMS Headers' value constant--------------------------- + /** + * Default time to live + */ + long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000; + + /** + * Default Jms Type + */ + String DEFAULT_JMS_TYPE = "rocketmq"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java new file mode 100644 index 0000000..b62e928 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.MapMaker; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +public class JmsBaseMessageConsumer implements MessageConsumer { + + private static final Object LOCK_OBJECT = new Object(); + //all shared consumers + private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private CommonContext context; + private Destination destination; + private MessageListener messageListener; + + public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext, + JmsBaseConnection connection) throws JMSException { + synchronized (LOCK_OBJECT) { + checkArgs(destination, commonContext); + + if (null == consumerMap.get(context.getConsumerId())) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId()); + if (context.getConsumeThreadNums() > 0) { + consumer.setConsumeThreadMax(context.getConsumeThreadNums()); + consumer.setConsumeThreadMin(context.getConsumeThreadNums()); + } + if (!Strings.isNullOrEmpty(context.getNameServer())) { + consumer.setNamesrvAddr(context.getNameServer()); + } + if (!Strings.isNullOrEmpty(context.getInstanceName())) { + consumer.setInstanceName(context.getInstanceName()); + } + consumer.setConsumeMessageBatchMaxSize(1); + //add subscribe? + RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer); + consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt); + } + + consumerMap.get(context.getConsumerId()).incrementAndGet(); + + //If the connection has been started, start the consumer right now. + //add start status? + RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId()); + if (connection.isStarted()) { + try { + consumerExt.start(); + } + catch (MQClientException mqe) { + JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId()); + jmsException.initCause(mqe); + throw jmsException; + } + } + } + + } + + private void checkArgs(Destination destination, CommonContext context) throws JMSException { + Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!"); + Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); + this.context = context; + this.destination = destination; + } + + @Override + public String getMessageSelector() throws JMSException { + return null; + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return this.messageListener; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt) { + try { + this.messageListener = listener; + String messageTopic = ((JmsBaseTopic) destination).getMessageTopic(); + String messageType = ((JmsBaseTopic) destination).getMessageType(); + rocketmqConsumerExt.subscribe(messageTopic, messageType, listener); + } + catch (MQClientException mqe) { + //add what? + throw new JMSException(mqe.getMessage()); + } + + } + + } + + @Override + public Message receive() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public Message receive(long timeout) throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public Message receiveNoWait() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public void close() throws JMSException { + synchronized (LOCK_OBJECT) { + if (closed.compareAndSet(false, true)) { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) { + rocketmqConsumerExt.close(); + consumerMap.remove(context.getConsumerId()); + } + } + } + } + + /** + * Start the consumer to get message from the Broker. + */ + public void startConsumer() throws JMSException { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt) { + try { + rocketmqConsumerExt.start(); + } + catch (MQClientException mqe) { + throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed"); + } + } + } + + public Destination getDestination() throws JMSException { + return this.destination; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java new file mode 100644 index 0000000..8dd82f0 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.MapMaker; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentMap; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.jms.domain.message.JmsBaseMessage; +import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; +import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; +import org.apache.rocketmq.jms.domain.message.JmsTextMessage; +import org.apache.rocketmq.jms.util.ExceptionUtil; +import org.apache.rocketmq.jms.util.MessageConverter; +import org.apache.rocketmq.jms.util.MsgConvertUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsBaseMessageProducer implements MessageProducer { + + private static final Object LOCK_OBJECT = new Object(); + private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap(); + private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class); + private CommonContext context; + + private Destination destination; + + public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException { + synchronized (LOCK_OBJECT) { + checkArgs(destination, context); + + if (null == producerMap.get(this.context.getProducerId())) { + DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId()); + if (!Strings.isNullOrEmpty(context.getNameServer())) { + producer.setNamesrvAddr(context.getNameServer()); + } + if (!Strings.isNullOrEmpty(context.getInstanceName())) { + producer.setInstanceName(context.getInstanceName()); + } + if (context.getSendMsgTimeoutMillis() > 0) { + producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis()); + } + try { + producer.start(); + } + catch (MQClientException mqe) { + throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId())); + } + producerMap.putIfAbsent(this.context.getProducerId(), producer); + } + + } + } + + private void checkArgs(Destination destination, CommonContext context) throws JMSException { + Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!"); + Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); + this.context = context; + this.destination = destination; + } + + @Override + public boolean getDisableMessageID() throws JMSException { + return false; + } + + @Override + public void setDisableMessageID(boolean value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public boolean getDisableMessageTimestamp() throws JMSException { + return false; + } + + @Override + public void setDisableMessageTimestamp(boolean value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public int getDeliveryMode() throws JMSException { + return javax.jms.Message.DEFAULT_DELIVERY_MODE; + } + + @Override + public void setDeliveryMode(int deliveryMode) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public int getPriority() throws JMSException { + return javax.jms.Message.DEFAULT_PRIORITY; + } + + @Override + public void setPriority(int defaultPriority) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public long getTimeToLive() throws JMSException { + return JmsBaseConstant.DEFAULT_TIME_TO_LIVE; + } + + @Override + public void setTimeToLive(long timeToLive) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public Destination getDestination() throws JMSException { + return this.destination; + } + + @Override + public void close() throws JMSException { + //Nothing to do + } + + @Override + public void send(javax.jms.Message message) throws JMSException { + this.send(getDestination(), message); + } + + /** + * Send the message to the defined Destination success---return normally. Exception---throw out JMSException. + * + * @param destination see <CODE>Destination</CODE> + * @param message the message to be sent. + * @throws javax.jms.JMSException + */ + @Override + public void send(Destination destination, javax.jms.Message message) throws JMSException { + JmsBaseMessage jmsMsg = (JmsBaseMessage) message; + initJMSHeaders(jmsMsg, destination); + + try { + if (context == null) { + throw new IllegalStateException("Context should be inited"); + } + org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg); + + MQProducer producer = producerMap.get(context.getProducerId()); + + if (producer == null) { + throw new Exception("producer is null "); + } + SendResult sendResult = producer.send(rocketmqMsg); + if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { + jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId()); + } else { + throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString())); + } + } + catch (Exception e) { + logger.error("Send rocketmq message failure !", e); + //if fail to send the message, throw out JMSException + JMSException jmsException = new JMSException("Send rocketmq message failure!"); + jmsException.setLinkedException(e); + throw jmsException; + } + } + + @Override + public void send(javax.jms.Message message, int deliveryMode, int priority, + long timeToLive) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void send(Destination destination, javax.jms.Message message, int deliveryMode, + int priority, long timeToLive) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Init the JmsMessage Headers. + * <p/> + * <P>JMS providers init message's headers. Do not allow user to set these by yourself. + * + * @param jmsMsg message + * @param destination + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException { + + //JMS_DESTINATION default:"topic:message" + jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination); + //JMS_DELIVERY_MODE default : PERSISTENT + jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); + //JMS_TIMESTAMP default : current time + jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis()); + //JMS_EXPIRATION default : 3 days + //JMS_EXPIRATION = currentTime + time_to_live + jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE); + //JMS_PRIORITY default : 4 + jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); + //JMS_TYPE default : open notification service + jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE); + //JMS_REPLY_TO,JMS_CORRELATION_ID default : null + //JMS_MESSAGE_ID is set by sendResult. + //JMS_REDELIVERED is set by broker. + } + + /** + * Init the OnsMessage Headers. + * <p/> + * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the + * OnsMessage's Properties. + * + * @param jmsMsg message + * @throws javax.jms.JMSException + */ + public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg, + String topic, String messageType) throws JMSException { + Properties userProperties = new Properties(); + + //Jms userProperties to properties + Map<String, Object> userProps = jmsMsg.getProperties(); + Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator(); + while (userPropsIter.hasNext()) { + Map.Entry<String, Object> entry = userPropsIter.next(); + userProperties.setProperty(entry.getKey(), entry.getValue().toString()); + } + //Jms systemProperties to ROCKETMQ properties + Map<String, Object> sysProps = jmsMsg.getHeaders(); + Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator(); + while (sysPropsIter.hasNext()) { + Map.Entry<String, Object> entry = sysPropsIter.next(); + userProperties.setProperty(entry.getKey(), entry.getValue().toString()); + } + + //Jms message Model + if (jmsMsg instanceof JmsBytesMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES); + } + else if (jmsMsg instanceof JmsObjectMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ); + } + else if (jmsMsg instanceof JmsTextMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT); + } + + //message topic and tag + userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic); + userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType); + + return userProperties; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java new file mode 100644 index 0000000..5bf7005 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; +import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; +import org.apache.rocketmq.jms.domain.message.JmsTextMessage; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +public class JmsBaseSession implements Session { + protected CommonContext context; + protected JmsBaseConnection connection; + protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList = + new CopyOnWriteArrayList<JmsBaseMessageConsumer>(); + private boolean transacted = true; + private int acknowledgeMode = AUTO_ACKNOWLEDGE; + + public JmsBaseSession(JmsBaseConnection connection, boolean transacted, + int acknowledgeMode, CommonContext context) { + this.context = context; + this.acknowledgeMode = acknowledgeMode; + this.transacted = transacted; + this.connection = connection; + } + + @Override + public BytesMessage createBytesMessage() throws JMSException { + return new JmsBytesMessage(); + } + + @Override + public MapMessage createMapMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Message createMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + return new JmsObjectMessage(); + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + return new JmsObjectMessage(object); + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public TextMessage createTextMessage() throws JMSException { + return new JmsTextMessage(); + } + + @Override + public TextMessage createTextMessage(String text) throws JMSException { + return new JmsTextMessage(text); + } + + @Override + public boolean getTransacted() throws JMSException { + return this.transacted; + } + + @Override + public int getAcknowledgeMode() { + return this.acknowledgeMode; + } + + @Override + public void commit() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void rollback() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void close() throws JMSException { + for (JmsBaseMessageConsumer messageConsumer : consumerList) { + messageConsumer.close(); + } + consumerList.clear(); + } + + @Override + public void recover() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return null; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public void run() { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + return new JmsBaseMessageProducer(destination, context); + } + + /** + * Create a MessageConsumer. + * <p/> + * <P>Create a durable consumer to the specified destination + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + JmsBaseMessageConsumer messageConsumer = new + JmsBaseMessageConsumer(destination, this.context, this.connection); + this.consumerList.addIfAbsent(messageConsumer); + return messageConsumer; + } + + /** + * Create a MessageConsumer with messageSelector. + * <p/> + * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @param messageSelector For filtering messages + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) + throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + + } + + /** + * Create a MessageConsumer with messageSelector. + * <p/> + * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject + * messages from localhost. + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @param messageSelector For filtering messages + * @param noLocal If true: reject messages from localhost + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector, + boolean noLocal) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Queue createQueue(String queueName) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Topic createTopic(String topicName) throws JMSException { + Preconditions.checkNotNull(topicName); + List<String> msgTuple = Arrays.asList(topicName.split(":")); + + Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2, + "Destination must match messageTopic:messageType !"); + + //If messageType is null, use * instead. + if (1 == msgTuple.size()) { + return new JmsBaseTopic(msgTuple.get(0), "*"); + } + return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)); + } + + /** + * Create a MessageConsumer with durable subscription. + * <p/> + * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable + * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. + * + * @param topic destination + * @throws javax.jms.JMSException + * @see <CODE>Topic</CODE> + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) + throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Create a MessageConsumer with durable subscription. + * <p/> + * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable + * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. + * + * @param topic destination + * @throws javax.jms.JMSException + * @see <CODE>Topic</CODE> + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name, + String messageSelector, + boolean noLocal) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + return new TemporaryQueue() { + public void delete() throws JMSException { + } + + public String getQueueName() throws JMSException { + return UUID.randomUUID().toString(); + } + }; + } + + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + return new TemporaryTopic() { + public void delete() throws JMSException { + } + + public String getTopicName() throws JMSException { + return UUID.randomUUID().toString(); + } + }; + } + + @Override + public void unsubscribe(String name) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + public void start() throws JMSException { + for (JmsBaseMessageConsumer messageConsumer : consumerList) { + messageConsumer.startConsumer(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java new file mode 100644 index 0000000..b7e2fab --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import javax.jms.JMSException; +import javax.jms.Topic; + +public class JmsBaseTopic implements Topic { + + private String messageTopic; + private String messageType; + + public JmsBaseTopic(String messageTopic, String messageType) { + Preconditions.checkNotNull(messageTopic); + Preconditions.checkNotNull(messageType); + + this.messageTopic = messageTopic; + this.messageType = messageType; + } + + public String getTopicName() throws JMSException { + return this.toString(); + } + + public String toString() { + return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType()); + } + + public String getMessageTopic() { + return messageTopic; + } + + public String getMessageType() { + return messageType; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/c4b20122/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java new file mode 100644 index 0000000..7a8a9f7 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.util.MessageConverter; + +public class RMQPushConsumerExt { + private final MQPushConsumer consumer; + private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>(); + + private AtomicInteger referenceCount = new AtomicInteger(0); + private AtomicBoolean started = new AtomicBoolean(false); + + public RMQPushConsumerExt(MQPushConsumer consumer) { + this.consumer = consumer; + } + + public MQPushConsumer getConsumer() { + return consumer; + } + + public int incrementAndGet() { + return referenceCount.incrementAndGet(); + } + + public int decrementAndGet() { + return referenceCount.decrementAndGet(); + } + + public int getReferenceCount() { + return referenceCount.get(); + } + public void start() throws MQClientException { + if (consumer == null) { + throw new MQClientException(-1, "consumer is null"); + } + + if (this.started.compareAndSet(false, true)) { + this.consumer.registerMessageListener(new MessageListenerImpl()); + this.consumer.start(); + } + } + + + public void close() { + if (this.started.compareAndSet(true, false)) { + this.consumer.shutdown(); + } + } + + public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException { + if (null == topic) { + throw new MQClientException(-1, "topic is null"); + } + + if (null == listener) { + throw new MQClientException(-1, "listener is null"); + } + + try { + this.subscribeTable.put(topic, listener); + this.consumer.subscribe(topic, subExpression); + } catch (MQClientException e) { + throw new MQClientException("consumer subscribe exception", e); + } + } + + public void unsubscribe(String topic) { + if (null != topic) { + this.consumer.unsubscribe(topic); + } + } + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) { + MessageExt msgRMQ = msgsRMQList.get(0); + javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic()); + if (null == listener) { + throw new RuntimeException("MessageListener is null"); + } + + try { + listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ)); + } + catch (Exception e) { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + + public boolean isStarted() { + return started.get(); + } + + + public boolean isClosed() { + return !isStarted(); + } +}
