Migrate rocketmq-jms to here.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/fab94406 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/fab94406 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/fab94406 Branch: refs/heads/master Commit: fab9440608cb3635c7fabf9bc9005462ba3b920a Parents: a85d9c7 Author: yukon <[email protected]> Authored: Wed Mar 15 15:52:45 2017 +0800 Committer: yukon <[email protected]> Committed: Wed Mar 15 16:12:41 2017 +0800 ---------------------------------------------------------------------- .gitignore | 5 - .travis.yml | 43 -- README.md | 31 -- core/pom.xml | 48 -- .../rocketmq/jms/domain/CommonConstant.java | 36 -- .../rocketmq/jms/domain/CommonContext.java | 183 -------- .../rocketmq/jms/domain/JmsBaseConnection.java | 172 -------- .../jms/domain/JmsBaseConnectionFactory.java | 146 ------- .../jms/domain/JmsBaseConnectionMetaData.java | 134 ------ .../rocketmq/jms/domain/JmsBaseConstant.java | 86 ---- .../jms/domain/JmsBaseMessageConsumer.java | 168 ------- .../jms/domain/JmsBaseMessageProducer.java | 281 ------------ .../rocketmq/jms/domain/JmsBaseSession.java | 308 ------------- .../rocketmq/jms/domain/JmsBaseTopic.java | 53 --- .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ------ .../jms/domain/message/JmsBaseMessage.java | 434 ------------------- .../jms/domain/message/JmsBytesMessage.java | 245 ----------- .../jms/domain/message/JmsObjectMessage.java | 41 -- .../jms/domain/message/JmsTextMessage.java | 48 -- .../apache/rocketmq/jms/util/ExceptionUtil.java | 41 -- .../rocketmq/jms/util/MessageConverter.java | 182 -------- .../rocketmq/jms/util/MsgConvertUtil.java | 90 ---- .../apache/rocketmq/jms/util/URISpecParser.java | 61 --- core/src/main/resources/application.conf | 1 - .../apache/rocketmq/jms/JmsTestListener.java | 67 --- .../org/apache/rocketmq/jms/JmsTestUtil.java | 54 --- .../jms/domain/message/JmsBytesMessageTest.java | 103 ----- .../domain/message/JmsMessageConvertTest.java | 52 --- .../domain/message/JmsObjectMessageTest.java | 92 ---- .../jms/domain/message/JmsTextMessageTest.java | 50 --- .../jms/integration/IntegrationTestBase.java | 199 --------- .../rocketmq/jms/integration/JmsClientIT.java | 191 -------- .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ------ .../rocketmq/jms/util/URISpecParserTest.java | 43 -- pom.xml | 196 --------- rocketmq-jms/.gitignore | 5 + rocketmq-jms/.travis.yml | 43 ++ rocketmq-jms/README.md | 31 ++ rocketmq-jms/core/pom.xml | 48 ++ .../rocketmq/jms/domain/CommonConstant.java | 36 ++ .../rocketmq/jms/domain/CommonContext.java | 183 ++++++++ .../rocketmq/jms/domain/JmsBaseConnection.java | 172 ++++++++ .../jms/domain/JmsBaseConnectionFactory.java | 146 +++++++ .../jms/domain/JmsBaseConnectionMetaData.java | 134 ++++++ .../rocketmq/jms/domain/JmsBaseConstant.java | 86 ++++ .../jms/domain/JmsBaseMessageConsumer.java | 168 +++++++ .../jms/domain/JmsBaseMessageProducer.java | 281 ++++++++++++ .../rocketmq/jms/domain/JmsBaseSession.java | 308 +++++++++++++ .../rocketmq/jms/domain/JmsBaseTopic.java | 53 +++ .../rocketmq/jms/domain/RMQPushConsumerExt.java | 128 ++++++ .../jms/domain/message/JmsBaseMessage.java | 434 +++++++++++++++++++ .../jms/domain/message/JmsBytesMessage.java | 245 +++++++++++ .../jms/domain/message/JmsObjectMessage.java | 41 ++ .../jms/domain/message/JmsTextMessage.java | 48 ++ .../apache/rocketmq/jms/util/ExceptionUtil.java | 41 ++ .../rocketmq/jms/util/MessageConverter.java | 182 ++++++++ .../rocketmq/jms/util/MsgConvertUtil.java | 90 ++++ .../apache/rocketmq/jms/util/URISpecParser.java | 61 +++ .../core/src/main/resources/application.conf | 1 + .../apache/rocketmq/jms/JmsTestListener.java | 67 +++ .../org/apache/rocketmq/jms/JmsTestUtil.java | 54 +++ .../jms/domain/message/JmsBytesMessageTest.java | 103 +++++ .../domain/message/JmsMessageConvertTest.java | 52 +++ .../domain/message/JmsObjectMessageTest.java | 92 ++++ .../jms/domain/message/JmsTextMessageTest.java | 50 +++ .../jms/integration/IntegrationTestBase.java | 199 +++++++++ .../rocketmq/jms/integration/JmsClientIT.java | 191 ++++++++ .../rocketmq/jms/integration/JmsConsumerIT.java | 131 ++++++ .../rocketmq/jms/util/URISpecParserTest.java | 43 ++ rocketmq-jms/pom.xml | 196 +++++++++ rocketmq-jms/spring/pom.xml | 82 ++++ .../SimpleExMessageListenerContainer.java | 90 ++++ .../rocketmq/jms/spring/JmsConsumeIT.java | 61 +++ .../rocketmq/jms/spring/JmsProduceIT.java | 93 ++++ .../rocketmq/jms/spring/SpringTestBase.java | 41 ++ .../spring/src/test/resources/consumer.xml | 51 +++ .../spring/src/test/resources/producer.xml | 43 ++ rocketmq-jms/style/copyright/Apache.xml | 24 + .../style/copyright/profiles_settings.xml | 64 +++ rocketmq-jms/style/rmq_checkstyle.xml | 135 ++++++ rocketmq-jms/style/rmq_codeStyle.xml | 157 +++++++ spring/pom.xml | 82 ---- .../SimpleExMessageListenerContainer.java | 90 ---- .../rocketmq/jms/spring/JmsConsumeIT.java | 61 --- .../rocketmq/jms/spring/JmsProduceIT.java | 93 ---- .../rocketmq/jms/spring/SpringTestBase.java | 41 -- spring/src/test/resources/consumer.xml | 51 --- spring/src/test/resources/producer.xml | 43 -- style/copyright/Apache.xml | 24 - style/copyright/profiles_settings.xml | 64 --- style/rmq_checkstyle.xml | 135 ------ style/rmq_codeStyle.xml | 157 ------- 92 files changed, 4984 insertions(+), 4984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore deleted file mode 100644 index d2e5aaf..0000000 --- a/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -.idea/ -*.iml -*.ipr -*.iws -target/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 9f430b2..0000000 --- a/.travis.yml +++ /dev/null @@ -1,43 +0,0 @@ -notifications: - email: - recipients: - - [email protected] - - [email protected] - on_success: change - on_failure: always - -language: java - -matrix: - include: - # On OSX, run with default JDK only. - # - os: osx - # On Linux, run with specific JDKs only. - # - os: linux - # env: CUSTOM_JDK="oraclejdk8" - - os: linux - env: CUSTOM_JDK="oraclejdk7" - #- os: linux - # env: CUSTOM_JDK="openjdk7" - -before_install: - - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc - - cat ~/.mavenrc - - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi - -#os: -# - linux -# - osx -#jdk: -# - oraclejdk8 -# - oraclejdk7 -# - openjdk7 - - -script: - - travis_retry mvn -B clean install jacoco:report coveralls:report - -#after_success: -# - mvn clean install -# - mvn sonar:sonar http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md deleted file mode 100644 index a05e27e..0000000 --- a/README.md +++ /dev/null @@ -1,31 +0,0 @@ -# RocketMQ-JMS [](https://travis-ci.org/rocketmq/rocketmq-jms) [](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master) - - -## Introduction -RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. -Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target. - -Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1". -Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version. - - -## Building - - > cd rocketmq-jms - > mvn clean install - - **run unit test:** - > mvn test - - **run integration test:** - > mvn verify - - **see jacoco code coverage report** - > open core/target/site/jacoco/index.html - > open core/target/site/jacoco-it/index.html - > open spring/target/site/jacoco-it/index.html - - -## Guidelines - - Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml deleted file mode 100644 index 1b36e14..0000000 --- a/core/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>rocketmq-jms-all</artifactId> - <groupId>org.apache.rocketmq</groupId> - <version>1.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>rocketmq-jms</artifactId> - <version>1.0-SNAPSHOT</version> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java deleted file mode 100644 index 80a8b64..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -public interface CommonConstant { - - String PRODUCERID = "producerId"; - - String CONSUMERID = "consumerId"; - - String PROVIDER = "provider"; - - String NAMESERVER = "nameServer"; - - String INSTANCE_NAME = "instanceName"; - - String CONSUME_THREAD_NUMS = "consumeThreadNums"; - - String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis"; - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java deleted file mode 100644 index c8e4276..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import org.apache.commons.lang.builder.ReflectionToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; - -public class CommonContext { - private String accessKey; - private String secretKey; - - private String consumerId; - private String producerId; - - private String provider; - - private String appId; - - private String nameServer; - - /** - * MQType - */ - private String mqType; - - /** - * Using for distinguishing client jvm process - */ - private String instanceName; - /** - * Set consumer threadPool Size - */ - private int consumeThreadNums; - /** - * Set send message timeOut - */ - private int sendMsgTimeoutMillis = -1; - - /** - * @return the appId - */ - public String getAppId() { - return appId; - } - - /** - * @param appId the appId to set - */ - public void setAppId(String appId) { - this.appId = appId; - } - - /** - * @return the provider - */ - public String getProvider() { - return provider; - } - - /** - * @param provider the provider to set - */ - public void setProvider(String provider) { - this.provider = provider; - } - - /** - * @return the instanceName - */ - public String getInstanceName() { - return instanceName; - } - - /** - * @param instanceName the instanceName to set - */ - public void setInstanceName(String instanceName) { - this.instanceName = instanceName; - } - - /** - * @return the accessKey - */ - public String getAccessKey() { - return accessKey; - } - - /** - * @param accessKey the accessKey to set - */ - public void setAccessKey(String accessKey) { - this.accessKey = accessKey; - } - - /** - * @return the secretKey - */ - public String getSecretKey() { - return secretKey; - } - - /** - * @param secretKey the secretKey to set - */ - public void setSecretKey(String secretKey) { - this.secretKey = secretKey; - } - - /** - * @return consumer thread nums - */ - public int getConsumeThreadNums() { - return consumeThreadNums; - } - - /** - * @param consumeThreadNums - */ - public void setConsumeThreadNums(int consumeThreadNums) { - this.consumeThreadNums = consumeThreadNums; - } - - public String getConsumerId() { - return consumerId; - } - - public void setConsumerId(String consumerId) { - this.consumerId = consumerId; - } - - public String getProducerId() { - return producerId; - } - - public void setProducerId(String producerId) { - this.producerId = producerId; - } - - public int getSendMsgTimeoutMillis() { - return sendMsgTimeoutMillis; - } - - public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) { - this.sendMsgTimeoutMillis = sendMsgTimeoutMillis; - } - - public String getMqType() { - return mqType; - } - - public void setMqType(String mqType) { - this.mqType = mqType; - } - - public String getNameServer() { - return nameServer; - } - - public void setNameServer(String nameServer) { - this.nameServer = nameServer; - } - - @Override - public String toString() { - return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java deleted file mode 100644 index 4c809c7..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Preconditions; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.commons.lang.StringUtils; - -public class JmsBaseConnection implements Connection { - private final AtomicBoolean started = new AtomicBoolean(false); - protected String clientID; - protected ExceptionListener exceptionListener; - protected CommonContext context; - protected JmsBaseSession session; - - public JmsBaseConnection(Map<String, String> connectionParams) { - - this.clientID = UUID.randomUUID().toString(); - - context = new CommonContext(); - - //At lease one should be set - context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID)); - context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID)); - - //optional - context.setProvider(connectionParams.get(CommonConstant.PROVIDER)); - - String nameServer = connectionParams.get(CommonConstant.NAMESERVER); - String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS); - String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS); - String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME); - - if (StringUtils.isNotEmpty(nameServer)) { - context.setNameServer(nameServer); - } - if (StringUtils.isNotEmpty(instanceName)) { - context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME)); - } - - if (StringUtils.isNotEmpty(consumerThreadNums)) { - context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums)); - } - if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) { - context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis)); - } - } - - @Override - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - - Preconditions.checkArgument(!transacted, "Not support transaction Session !"); - Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode, - "Not support this acknowledge mode: " + acknowledgeMode); - - if (null != this.session) { - return this.session; - } - synchronized (this) { - if (null != this.session) { - return this.session; - } - this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context); - if (isStarted()) { - this.session.start(); - } - return this.session; - } - } - - @Override - public String getClientID() throws JMSException { - return this.clientID; - } - - @Override - public void setClientID(String clientID) throws JMSException { - this.clientID = clientID; - } - - @Override - public ConnectionMetaData getMetaData() throws JMSException { - return new JmsBaseConnectionMetaData(); - } - - @Override - public ExceptionListener getExceptionListener() throws JMSException { - return this.exceptionListener; - } - - @Override - public void setExceptionListener(ExceptionListener listener) throws JMSException { - this.exceptionListener = listener; - } - - @Override - public void start() throws JMSException { - if (started.compareAndSet(false, true)) { - if (this.session != null) { - this.session.start(); - } - - } - } - - @Override - public void stop() throws JMSException { - //Stop the connection before closing it. - //Do nothing here. - } - - @Override - public void close() throws JMSException { - if (started.compareAndSet(true, false)) { - if (this.session != null) { - this.session.close(); - } - - } - } - - @Override - public ConnectionConsumer createConnectionConsumer(Destination destination, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - /** - * Whether the connection is started. - * - * @return whether the connection is started. - */ - public boolean isStarted() { - return started.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java deleted file mode 100644 index 1b9da06..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Preconditions; -import java.net.URI; -import java.util.Map; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import org.apache.rocketmq.jms.util.URISpecParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsBaseConnectionFactory implements ConnectionFactory { - - private static Logger logger = LoggerFactory - .getLogger(JmsBaseConnectionFactory.class); - /** - * Synchronization monitor for the shared Connection - */ - private final Object connectionMonitor = new Object(); - /** - * Can be configured in a consistent way without too much URL hacking. - */ - protected URI connectionUri; - /** - * Store connection uri query parameters. - */ - protected Map<String, String> connectionParams; - /** - * Wrapped Connection - */ - protected JmsBaseConnection connection; - - public JmsBaseConnectionFactory() { - - } - - public JmsBaseConnectionFactory(URI connectionUri) { - setConnectionUri(connectionUri); - } - - public void setConnectionUri(URI connectionUri) { - Preconditions.checkNotNull(connectionUri, "Please set URI !"); - this.connectionUri = connectionUri; - this.connectionParams = URISpecParser.parseURI(connectionUri.toString()); - - if (null != connectionParams) { - Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) || - null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !"); - } - - } - - @Override - public Connection createConnection() throws JMSException { - synchronized (this.connectionMonitor) { - if (this.connection == null) { - initConnection(); - } - return this.connection; - } - } - - /** - * Using userName and Password to create a connection - * - * @param userName ignored - * @param password ignored - * @return the new JMS Connection - * @throws JMSException - */ - @Override - public Connection createConnection(String userName, String password) throws JMSException { - logger.debug("Using userName and Password to create a connection."); - return this.createConnection(); - } - - /** - * Initialize the underlying shared Connection. - * <p/> - * Closes and reInitializes the Connection if an underlying Connection is present already. - * - * @throws javax.jms.JMSException if thrown by JMS API methods - */ - protected void initConnection() throws JMSException { - synchronized (this.connectionMonitor) { - if (this.connection != null) { - closeConnection(this.connection); - } - this.connection = doCreateConnection(); - logger.debug("Established shared JMS Connection: {}", this.connection); - } - } - - /** - * Close the given Connection. - * - * @param con the Connection to close - */ - protected void closeConnection(Connection con) { - logger.debug("Closing shared JMS Connection: {}", this.connection); - try { - try { - con.stop(); - } - finally { - con.close(); - } - } - catch (Throwable ex) { - logger.error("Could not close shared JMS Connection.", ex); - } - } - - /** - * Create a JMS Connection - * - * @return the new JMS Connection - * @throws javax.jms.JMSException if thrown by JMS API methods - */ - protected JmsBaseConnection doCreateConnection() throws JMSException { - Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0, - "Connection Parameters can not be null!"); - this.connection = new JmsBaseConnection(this.connectionParams); - - return connection; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java deleted file mode 100644 index ee549aa..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Enumeration; -import java.util.Properties; -import java.util.Vector; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.jms.ConnectionMetaData; -import javax.jms.JMSException; - -public class JmsBaseConnectionMetaData implements ConnectionMetaData { - public static final String JMS_VERSION; - public static final int JMS_MAJOR_VERSION; - public static final int JMS_MINOR_VERSION; - - public static final String PROVIDER_VERSION; - public static final int PROVIDER_MAJOR_VERSION; - public static final int PROVIDER_MINOR_VERSION; - - public static final String PROVIDER_NAME = "Apache RocketMQ"; - - public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData(); - - public static InputStream resourceStream; - - static { - Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); - - String jmsVersion = null; - int jmsMajor = 0; - int jmsMinor = 0; - try { - Package p = Package.getPackage("javax.jms"); - if (p != null) { - jmsVersion = p.getImplementationVersion(); - Matcher m = pattern.matcher(jmsVersion); - if (m.matches()) { - jmsMajor = Integer.parseInt(m.group(1)); - jmsMinor = Integer.parseInt(m.group(2)); - } - } - } - catch (Throwable e) { - } - JMS_VERSION = jmsVersion; - JMS_MAJOR_VERSION = jmsMajor; - JMS_MINOR_VERSION = jmsMinor; - - String providerVersion = null; - int providerMajor = 0; - int providerMinor = 0; - Properties properties = new Properties(); - try { - resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf"); - properties.load(resourceStream); - providerVersion = properties.getProperty("version"); - - Matcher m = pattern.matcher(providerVersion); - if (m.matches()) { - providerMajor = Integer.parseInt(m.group(1)); - providerMinor = Integer.parseInt(m.group(2)); - } - } - catch (IOException e) { - e.printStackTrace(); - } - PROVIDER_VERSION = providerVersion; - PROVIDER_MAJOR_VERSION = providerMajor; - PROVIDER_MINOR_VERSION = providerMinor; - - } - - public String getJMSVersion() throws JMSException { - return JMS_VERSION; - } - - public int getJMSMajorVersion() throws JMSException { - return JMS_MAJOR_VERSION; - } - - public int getJMSMinorVersion() throws JMSException { - return JMS_MINOR_VERSION; - } - - public String getJMSProviderName() throws JMSException { - return PROVIDER_NAME; - } - - public String getProviderVersion() throws JMSException { - return PROVIDER_VERSION; - } - - public int getProviderMajorVersion() throws JMSException { - return PROVIDER_MAJOR_VERSION; - } - - public int getProviderMinorVersion() throws JMSException { - return PROVIDER_MINOR_VERSION; - } - - public Enumeration<?> getJMSXPropertyNames() throws JMSException { - Vector<String> jmxProperties = new Vector<String>(); - jmxProperties.add("jmsXUserId"); - jmxProperties.add("jmsXAppId"); - jmxProperties.add("jmsXGroupID"); - jmxProperties.add("jmsXGroupSeq"); - jmxProperties.add("jmsXState"); - jmxProperties.add("jmsXDeliveryCount"); - jmxProperties.add("jmsXProducerTXID"); - jmxProperties.add("jmsConsumerTXID"); - jmxProperties.add("jmsRecvTimeStamp"); - return jmxProperties.elements(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java deleted file mode 100644 index f0bca28..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -public interface JmsBaseConstant { - //------------------------JMS message header constant--------------------------------- - String JMS_DESTINATION = "jmsDestination"; - String JMS_DELIVERY_MODE = "jmsDeliveryMode"; - String JMS_EXPIRATION = "jmsExpiration"; - String JMS_DELIVERY_TIME = "jmsDeliveryTime"; - String JMS_PRIORITY = "jmsPriority"; - String JMS_MESSAGE_ID = "jmsMessageID"; - String JMS_TIMESTAMP = "jmsTimestamp"; - String JMS_CORRELATION_ID = "jmsCorrelationID"; - String JMS_REPLY_TO = "jmsReplyTo"; - String JMS_TYPE = "jmsType"; - String JMS_REDELIVERED = "jmsRedelivered"; - - //-------------------------JMS defined properties constant---------------------------- - /** - * The identity of the user sending the Send message - */ - String JMS_XUSER_ID = "jmsXUserID"; - /** - * The identity of the application Send sending the message - */ - String JMS_XAPP_ID = "jmsXAppID"; - /** - * The number of message delivery Receive attempts - */ - String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount"; - /** - * The identity of the message group this message is part of - */ - String JMS_XGROUP_ID = "jmsXGroupID"; - /** - * The sequence number of this message within the group; the first message is 1, the second 2,... - */ - String JMS_XGROUP_SEQ = "jmsXGroupSeq"; - /** - * The transaction identifier of the Send transaction within which this message was produced - */ - String JMS_XPRODUCER_TXID = "jmsXProducerTXID"; - /** - * The transaction identifier of the Receive transaction within which this message was consumed - */ - String JMS_XCONSUMER_TXID = "jmsXConsumerTXID"; - - /** - * The time JMS delivered the Receive message to the consumer - */ - String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp"; - /** - * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and - * that these copies exist from the time the original message was sent. Each copyâs state is one of: 1(waiting), - * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided - * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this. - */ - String JMS_XSTATE = "jmsXState"; - - //---------------------------JMS Headers' value constant--------------------------- - /** - * Default time to live - */ - long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000; - - /** - * Default Jms Type - */ - String DEFAULT_JMS_TYPE = "rocketmq"; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java deleted file mode 100644 index b62e928..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.MapMaker; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.jms.util.ExceptionUtil; - -public class JmsBaseMessageConsumer implements MessageConsumer { - - private static final Object LOCK_OBJECT = new Object(); - //all shared consumers - private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap(); - private final AtomicBoolean closed = new AtomicBoolean(false); - private CommonContext context; - private Destination destination; - private MessageListener messageListener; - - public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext, - JmsBaseConnection connection) throws JMSException { - synchronized (LOCK_OBJECT) { - checkArgs(destination, commonContext); - - if (null == consumerMap.get(context.getConsumerId())) { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId()); - if (context.getConsumeThreadNums() > 0) { - consumer.setConsumeThreadMax(context.getConsumeThreadNums()); - consumer.setConsumeThreadMin(context.getConsumeThreadNums()); - } - if (!Strings.isNullOrEmpty(context.getNameServer())) { - consumer.setNamesrvAddr(context.getNameServer()); - } - if (!Strings.isNullOrEmpty(context.getInstanceName())) { - consumer.setInstanceName(context.getInstanceName()); - } - consumer.setConsumeMessageBatchMaxSize(1); - //add subscribe? - RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer); - consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt); - } - - consumerMap.get(context.getConsumerId()).incrementAndGet(); - - //If the connection has been started, start the consumer right now. - //add start status? - RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId()); - if (connection.isStarted()) { - try { - consumerExt.start(); - } - catch (MQClientException mqe) { - JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId()); - jmsException.initCause(mqe); - throw jmsException; - } - } - } - - } - - private void checkArgs(Destination destination, CommonContext context) throws JMSException { - Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!"); - Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); - this.context = context; - this.destination = destination; - } - - @Override - public String getMessageSelector() throws JMSException { - return null; - } - - @Override - public MessageListener getMessageListener() throws JMSException { - return this.messageListener; - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSException { - RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); - if (null != rocketmqConsumerExt) { - try { - this.messageListener = listener; - String messageTopic = ((JmsBaseTopic) destination).getMessageTopic(); - String messageType = ((JmsBaseTopic) destination).getMessageType(); - rocketmqConsumerExt.subscribe(messageTopic, messageType, listener); - } - catch (MQClientException mqe) { - //add what? - throw new JMSException(mqe.getMessage()); - } - - } - - } - - @Override - public Message receive() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public Message receive(long timeout) throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public Message receiveNoWait() throws JMSException { - throw new UnsupportedOperationException("Unsupported!"); - } - - @Override - public void close() throws JMSException { - synchronized (LOCK_OBJECT) { - if (closed.compareAndSet(false, true)) { - RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); - if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) { - rocketmqConsumerExt.close(); - consumerMap.remove(context.getConsumerId()); - } - } - } - } - - /** - * Start the consumer to get message from the Broker. - */ - public void startConsumer() throws JMSException { - RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); - if (null != rocketmqConsumerExt) { - try { - rocketmqConsumerExt.start(); - } - catch (MQClientException mqe) { - throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed"); - } - } - } - - public Destination getDestination() throws JMSException { - return this.destination; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java deleted file mode 100644 index 8dd82f0..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.MapMaker; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentMap; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.MQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.jms.domain.message.JmsBaseMessage; -import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; -import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; -import org.apache.rocketmq.jms.domain.message.JmsTextMessage; -import org.apache.rocketmq.jms.util.ExceptionUtil; -import org.apache.rocketmq.jms.util.MessageConverter; -import org.apache.rocketmq.jms.util.MsgConvertUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsBaseMessageProducer implements MessageProducer { - - private static final Object LOCK_OBJECT = new Object(); - private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap(); - private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class); - private CommonContext context; - - private Destination destination; - - public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException { - synchronized (LOCK_OBJECT) { - checkArgs(destination, context); - - if (null == producerMap.get(this.context.getProducerId())) { - DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId()); - if (!Strings.isNullOrEmpty(context.getNameServer())) { - producer.setNamesrvAddr(context.getNameServer()); - } - if (!Strings.isNullOrEmpty(context.getInstanceName())) { - producer.setInstanceName(context.getInstanceName()); - } - if (context.getSendMsgTimeoutMillis() > 0) { - producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis()); - } - try { - producer.start(); - } - catch (MQClientException mqe) { - throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId())); - } - producerMap.putIfAbsent(this.context.getProducerId(), producer); - } - - } - } - - private void checkArgs(Destination destination, CommonContext context) throws JMSException { - Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!"); - Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); - this.context = context; - this.destination = destination; - } - - @Override - public boolean getDisableMessageID() throws JMSException { - return false; - } - - @Override - public void setDisableMessageID(boolean value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public boolean getDisableMessageTimestamp() throws JMSException { - return false; - } - - @Override - public void setDisableMessageTimestamp(boolean value) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public int getDeliveryMode() throws JMSException { - return javax.jms.Message.DEFAULT_DELIVERY_MODE; - } - - @Override - public void setDeliveryMode(int deliveryMode) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public int getPriority() throws JMSException { - return javax.jms.Message.DEFAULT_PRIORITY; - } - - @Override - public void setPriority(int defaultPriority) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public long getTimeToLive() throws JMSException { - return JmsBaseConstant.DEFAULT_TIME_TO_LIVE; - } - - @Override - public void setTimeToLive(long timeToLive) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public Destination getDestination() throws JMSException { - return this.destination; - } - - @Override - public void close() throws JMSException { - //Nothing to do - } - - @Override - public void send(javax.jms.Message message) throws JMSException { - this.send(getDestination(), message); - } - - /** - * Send the message to the defined Destination success---return normally. Exception---throw out JMSException. - * - * @param destination see <CODE>Destination</CODE> - * @param message the message to be sent. - * @throws javax.jms.JMSException - */ - @Override - public void send(Destination destination, javax.jms.Message message) throws JMSException { - JmsBaseMessage jmsMsg = (JmsBaseMessage) message; - initJMSHeaders(jmsMsg, destination); - - try { - if (context == null) { - throw new IllegalStateException("Context should be inited"); - } - org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg); - - MQProducer producer = producerMap.get(context.getProducerId()); - - if (producer == null) { - throw new Exception("producer is null "); - } - SendResult sendResult = producer.send(rocketmqMsg); - if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { - jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId()); - } else { - throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString())); - } - } - catch (Exception e) { - logger.error("Send rocketmq message failure !", e); - //if fail to send the message, throw out JMSException - JMSException jmsException = new JMSException("Send rocketmq message failure!"); - jmsException.setLinkedException(e); - throw jmsException; - } - } - - @Override - public void send(javax.jms.Message message, int deliveryMode, int priority, - long timeToLive) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public void send(Destination destination, javax.jms.Message message, int deliveryMode, - int priority, long timeToLive) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - /** - * Init the JmsMessage Headers. - * <p/> - * <P>JMS providers init message's headers. Do not allow user to set these by yourself. - * - * @param jmsMsg message - * @param destination - * @throws javax.jms.JMSException - * @see <CODE>Destination</CODE> - */ - private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException { - - //JMS_DESTINATION default:"topic:message" - jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination); - //JMS_DELIVERY_MODE default : PERSISTENT - jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); - //JMS_TIMESTAMP default : current time - jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis()); - //JMS_EXPIRATION default : 3 days - //JMS_EXPIRATION = currentTime + time_to_live - jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE); - //JMS_PRIORITY default : 4 - jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); - //JMS_TYPE default : open notification service - jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE); - //JMS_REPLY_TO,JMS_CORRELATION_ID default : null - //JMS_MESSAGE_ID is set by sendResult. - //JMS_REDELIVERED is set by broker. - } - - /** - * Init the OnsMessage Headers. - * <p/> - * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the - * OnsMessage's Properties. - * - * @param jmsMsg message - * @throws javax.jms.JMSException - */ - public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg, - String topic, String messageType) throws JMSException { - Properties userProperties = new Properties(); - - //Jms userProperties to properties - Map<String, Object> userProps = jmsMsg.getProperties(); - Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator(); - while (userPropsIter.hasNext()) { - Map.Entry<String, Object> entry = userPropsIter.next(); - userProperties.setProperty(entry.getKey(), entry.getValue().toString()); - } - //Jms systemProperties to ROCKETMQ properties - Map<String, Object> sysProps = jmsMsg.getHeaders(); - Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator(); - while (sysPropsIter.hasNext()) { - Map.Entry<String, Object> entry = sysPropsIter.next(); - userProperties.setProperty(entry.getKey(), entry.getValue().toString()); - } - - //Jms message Model - if (jmsMsg instanceof JmsBytesMessage) { - userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES); - } - else if (jmsMsg instanceof JmsObjectMessage) { - userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ); - } - else if (jmsMsg instanceof JmsTextMessage) { - userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT); - } - - //message topic and tag - userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic); - userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType); - - return userProperties; - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java deleted file mode 100644 index 5bf7005..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Preconditions; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; -import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; -import org.apache.rocketmq.jms.domain.message.JmsTextMessage; -import org.apache.rocketmq.jms.util.ExceptionUtil; - -public class JmsBaseSession implements Session { - protected CommonContext context; - protected JmsBaseConnection connection; - protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList = - new CopyOnWriteArrayList<JmsBaseMessageConsumer>(); - private boolean transacted = true; - private int acknowledgeMode = AUTO_ACKNOWLEDGE; - - public JmsBaseSession(JmsBaseConnection connection, boolean transacted, - int acknowledgeMode, CommonContext context) { - this.context = context; - this.acknowledgeMode = acknowledgeMode; - this.transacted = transacted; - this.connection = connection; - } - - @Override - public BytesMessage createBytesMessage() throws JMSException { - return new JmsBytesMessage(); - } - - @Override - public MapMessage createMapMessage() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public Message createMessage() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public ObjectMessage createObjectMessage() throws JMSException { - return new JmsObjectMessage(); - } - - @Override - public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - return new JmsObjectMessage(object); - } - - @Override - public StreamMessage createStreamMessage() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public TextMessage createTextMessage() throws JMSException { - return new JmsTextMessage(); - } - - @Override - public TextMessage createTextMessage(String text) throws JMSException { - return new JmsTextMessage(text); - } - - @Override - public boolean getTransacted() throws JMSException { - return this.transacted; - } - - @Override - public int getAcknowledgeMode() { - return this.acknowledgeMode; - } - - @Override - public void commit() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public void rollback() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public void close() throws JMSException { - for (JmsBaseMessageConsumer messageConsumer : consumerList) { - messageConsumer.close(); - } - consumerList.clear(); - } - - @Override - public void recover() throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public MessageListener getMessageListener() throws JMSException { - return null; - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSException { - ExceptionUtil.handleUnSupportedException(); - } - - @Override - public void run() { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public MessageProducer createProducer(Destination destination) throws JMSException { - return new JmsBaseMessageProducer(destination, context); - } - - /** - * Create a MessageConsumer. - * <p/> - * <P>Create a durable consumer to the specified destination - * - * @param destination Equals to Topic:MessageType in ROCKETMQ - * @throws javax.jms.JMSException - * @see <CODE>Destination</CODE> - */ - @Override - public MessageConsumer createConsumer(Destination destination) throws JMSException { - JmsBaseMessageConsumer messageConsumer = new - JmsBaseMessageConsumer(destination, this.context, this.connection); - this.consumerList.addIfAbsent(messageConsumer); - return messageConsumer; - } - - /** - * Create a MessageConsumer with messageSelector. - * <p/> - * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages - * - * @param destination Equals to Topic:MessageType in ROCKETMQ - * @param messageSelector For filtering messages - * @throws javax.jms.JMSException - * @see <CODE>Destination</CODE> - */ - @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector) - throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - - } - - /** - * Create a MessageConsumer with messageSelector. - * <p/> - * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject - * messages from localhost. - * - * @param destination Equals to Topic:MessageType in ROCKETMQ - * @param messageSelector For filtering messages - * @param noLocal If true: reject messages from localhost - * @throws javax.jms.JMSException - * @see <CODE>Destination</CODE> - */ - @Override - public MessageConsumer createConsumer(Destination destination, String messageSelector, - boolean noLocal) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public Queue createQueue(String queueName) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public Topic createTopic(String topicName) throws JMSException { - Preconditions.checkNotNull(topicName); - List<String> msgTuple = Arrays.asList(topicName.split(":")); - - Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2, - "Destination must match messageTopic:messageType !"); - - //If messageType is null, use * instead. - if (1 == msgTuple.size()) { - return new JmsBaseTopic(msgTuple.get(0), "*"); - } - return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)); - } - - /** - * Create a MessageConsumer with durable subscription. - * <p/> - * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable - * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. - * - * @param topic destination - * @throws javax.jms.JMSException - * @see <CODE>Topic</CODE> - */ - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String name) - throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - /** - * Create a MessageConsumer with durable subscription. - * <p/> - * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable - * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. - * - * @param topic destination - * @throws javax.jms.JMSException - * @see <CODE>Topic</CODE> - */ - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String name, - String messageSelector, - boolean noLocal) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public QueueBrowser createBrowser(Queue queue) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - @Override - public TemporaryQueue createTemporaryQueue() throws JMSException { - return new TemporaryQueue() { - public void delete() throws JMSException { - } - - public String getQueueName() throws JMSException { - return UUID.randomUUID().toString(); - } - }; - } - - @Override - public TemporaryTopic createTemporaryTopic() throws JMSException { - return new TemporaryTopic() { - public void delete() throws JMSException { - } - - public String getTopicName() throws JMSException { - return UUID.randomUUID().toString(); - } - }; - } - - @Override - public void unsubscribe(String name) throws JMSException { - throw new UnsupportedOperationException("Unsupported"); - } - - public void start() throws JMSException { - for (JmsBaseMessageConsumer messageConsumer : consumerList) { - messageConsumer.startConsumer(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java deleted file mode 100644 index b7e2fab..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import javax.jms.JMSException; -import javax.jms.Topic; - -public class JmsBaseTopic implements Topic { - - private String messageTopic; - private String messageType; - - public JmsBaseTopic(String messageTopic, String messageType) { - Preconditions.checkNotNull(messageTopic); - Preconditions.checkNotNull(messageType); - - this.messageTopic = messageTopic; - this.messageType = messageType; - } - - public String getTopicName() throws JMSException { - return this.toString(); - } - - public String toString() { - return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType()); - } - - public String getMessageTopic() { - return messageTopic; - } - - public String getMessageType() { - return messageType; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java deleted file mode 100644 index 7a8a9f7..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.domain; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.client.consumer.MQPushConsumer; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.jms.util.MessageConverter; - -public class RMQPushConsumerExt { - private final MQPushConsumer consumer; - private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>(); - - private AtomicInteger referenceCount = new AtomicInteger(0); - private AtomicBoolean started = new AtomicBoolean(false); - - public RMQPushConsumerExt(MQPushConsumer consumer) { - this.consumer = consumer; - } - - public MQPushConsumer getConsumer() { - return consumer; - } - - public int incrementAndGet() { - return referenceCount.incrementAndGet(); - } - - public int decrementAndGet() { - return referenceCount.decrementAndGet(); - } - - public int getReferenceCount() { - return referenceCount.get(); - } - public void start() throws MQClientException { - if (consumer == null) { - throw new MQClientException(-1, "consumer is null"); - } - - if (this.started.compareAndSet(false, true)) { - this.consumer.registerMessageListener(new MessageListenerImpl()); - this.consumer.start(); - } - } - - - public void close() { - if (this.started.compareAndSet(true, false)) { - this.consumer.shutdown(); - } - } - - public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException { - if (null == topic) { - throw new MQClientException(-1, "topic is null"); - } - - if (null == listener) { - throw new MQClientException(-1, "listener is null"); - } - - try { - this.subscribeTable.put(topic, listener); - this.consumer.subscribe(topic, subExpression); - } catch (MQClientException e) { - throw new MQClientException("consumer subscribe exception", e); - } - } - - public void unsubscribe(String topic) { - if (null != topic) { - this.consumer.unsubscribe(topic); - } - } - - class MessageListenerImpl implements MessageListenerConcurrently { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) { - MessageExt msgRMQ = msgsRMQList.get(0); - javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic()); - if (null == listener) { - throw new RuntimeException("MessageListener is null"); - } - - try { - listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ)); - } - catch (Exception e) { - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - } - - - public boolean isStarted() { - return started.get(); - } - - - public boolean isClosed() { - return !isStarted(); - } -}
