http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java b/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java deleted file mode 100644 index af57f67..0000000 --- a/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.util; - -import java.util.Map; -import org.junit.Test; - -public class URISpecParserTest { - - @Test - public void parseURI_NormalTest() { - Map<String, String> result = URISpecParser.parseURI("rocketmq://localhost"); - System.out.println(result); - - result = URISpecParser - .parseURI("rocketmq://xxx?appId=test&consumerId=testGroup"); - System.out.println(result); - - result = URISpecParser.parseURI("rocketmq:!@#$%^&*()//localhost?appId=test!@#$%^&*()"); - System.out.println(result); - } - - @Test(expected = IllegalArgumentException.class) - public void parseURI_AbnormalTest() { - URISpecParser.parseURI("metaq3://localhost?appId=test"); - } - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 469b9be..0000000 --- a/pom.xml +++ /dev/null @@ -1,196 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns="http://maven.apache.org/POM/4.0.0" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-jms-all</artifactId> - <packaging>pom</packaging> - <version>1.0-SNAPSHOT</version> - <modules> - <module>spring</module> - <module>core</module> - </modules> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <!--maven properties --> - <maven.test.skip>false</maven.test.skip> - <maven.javadoc.skip>true</maven.javadoc.skip> - <!-- compiler settings properties --> - <maven.compiler.source>1.6</maven.compiler.source> - <maven.compiler.target>1.6</maven.compiler.target> - <surefire.version>2.19.1</surefire.version> - <rocketmq.version>4.0.0-incubating</rocketmq.version> - - </properties> - <dependencies> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - <version>${rocketmq.version}</version> - </dependency> - <dependency> - <groupId>javax.jms</groupId> - <artifactId>jms-api</artifactId> - <version>1.1-rev-1</version> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>18.0</version> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.6</version> - </dependency> - - <!--test--> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - <version>4.12</version> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-namesrv</artifactId> - <version>${rocketmq.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-broker</artifactId> - <version>${rocketmq.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - - <build> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.5.1</version> - <configuration> - <source>${maven.compiler.source}</source> - <target>${maven.compiler.target}</target> - <compilerVersion>${maven.compiler.source}</compilerVersion> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - <version>0.7.8</version> - <executions> - <execution> - <id>default-prepare-agent</id> - <goals> - <goal>prepare-agent</goal> - </goals> - <configuration> - <destFile>${project.build.directory}/jacoco.exec</destFile> - </configuration> - </execution> - <execution> - <id>default-prepare-agent-integration</id> - <phase>pre-integration-test</phase> - <goals> - <goal>prepare-agent-integration</goal> - </goals> - <configuration> - <destFile>${project.build.directory}/jacoco-it.exec</destFile> - <propertyName>failsafeArgLine</propertyName> - </configuration> - </execution> - <execution> - <id>default-report</id> - <goals> - <goal>report</goal> - </goals> - </execution> - <execution> - <id>default-report-integration</id> - <goals> - <goal>report-integration</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <version>${surefire.version}</version> - </plugin> - <plugin> - <artifactId>maven-failsafe-plugin</artifactId> - <version>${surefire.version}</version> - <configuration> - <forkCount>1</forkCount> - <reuseForks>true</reuseForks> - <argLine>@{failsafeArgLine}</argLine> - </configuration> - <executions> - <execution> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.eluder.coveralls</groupId> - <artifactId>coveralls-maven-plugin</artifactId> - <version>4.3.0</version> - </plugin> - <plugin> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.17</version> - <executions> - <execution> - <id>verify</id> - <phase>verify</phase> - <configuration> - <configLocation>style/rmq_checkstyle.xml</configLocation> - <encoding>UTF-8</encoding> - <consoleOutput>true</consoleOutput> - <failsOnError>true</failsOnError> - </configuration> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - <resources> - <resource> - <directory>src/main/resources</directory> - <filtering>true</filtering> - </resource> - </resources> - </build> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore new file mode 100644 index 0000000..d2e5aaf --- /dev/null +++ b/rocketmq-jms/.gitignore @@ -0,0 +1,5 @@ +.idea/ +*.iml +*.ipr +*.iws +target/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/.travis.yml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml new file mode 100644 index 0000000..9f430b2 --- /dev/null +++ b/rocketmq-jms/.travis.yml @@ -0,0 +1,43 @@ +notifications: + email: + recipients: + - zhangke.huangs...@gmail.com + - zhendongli...@gmail.com + on_success: change + on_failure: always + +language: java + +matrix: + include: + # On OSX, run with default JDK only. + # - os: osx + # On Linux, run with specific JDKs only. + # - os: linux + # env: CUSTOM_JDK="oraclejdk8" + - os: linux + env: CUSTOM_JDK="oraclejdk7" + #- os: linux + # env: CUSTOM_JDK="openjdk7" + +before_install: + - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc + - cat ~/.mavenrc + - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi + - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi + +#os: +# - linux +# - osx +#jdk: +# - oraclejdk8 +# - oraclejdk7 +# - openjdk7 + + +script: + - travis_retry mvn -B clean install jacoco:report coveralls:report + +#after_success: +# - mvn clean install +# - mvn sonar:sonar http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md new file mode 100644 index 0000000..a05e27e --- /dev/null +++ b/rocketmq-jms/README.md @@ -0,0 +1,31 @@ +# RocketMQ-JMS [![Build Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms) [![Coverage Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master) + + +## Introduction +RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. +Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target. + +Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1". +Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version. + + +## Building + + > cd rocketmq-jms + > mvn clean install + + **run unit test:** + > mvn test + + **run integration test:** + > mvn verify + + **see jacoco code coverage report** + > open core/target/site/jacoco/index.html + > open core/target/site/jacoco-it/index.html + > open spring/target/site/jacoco-it/index.html + + +## Guidelines + + Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml new file mode 100644 index 0000000..1b36e14 --- /dev/null +++ b/rocketmq-jms/core/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-jms-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-jms</artifactId> + <version>1.0-SNAPSHOT</version> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java new file mode 100644 index 0000000..80a8b64 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +public interface CommonConstant { + + String PRODUCERID = "producerId"; + + String CONSUMERID = "consumerId"; + + String PROVIDER = "provider"; + + String NAMESERVER = "nameServer"; + + String INSTANCE_NAME = "instanceName"; + + String CONSUME_THREAD_NUMS = "consumeThreadNums"; + + String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis"; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java new file mode 100644 index 0000000..c8e4276 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +public class CommonContext { + private String accessKey; + private String secretKey; + + private String consumerId; + private String producerId; + + private String provider; + + private String appId; + + private String nameServer; + + /** + * MQType + */ + private String mqType; + + /** + * Using for distinguishing client jvm process + */ + private String instanceName; + /** + * Set consumer threadPool Size + */ + private int consumeThreadNums; + /** + * Set send message timeOut + */ + private int sendMsgTimeoutMillis = -1; + + /** + * @return the appId + */ + public String getAppId() { + return appId; + } + + /** + * @param appId the appId to set + */ + public void setAppId(String appId) { + this.appId = appId; + } + + /** + * @return the provider + */ + public String getProvider() { + return provider; + } + + /** + * @param provider the provider to set + */ + public void setProvider(String provider) { + this.provider = provider; + } + + /** + * @return the instanceName + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @param instanceName the instanceName to set + */ + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + /** + * @return the accessKey + */ + public String getAccessKey() { + return accessKey; + } + + /** + * @param accessKey the accessKey to set + */ + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + /** + * @return the secretKey + */ + public String getSecretKey() { + return secretKey; + } + + /** + * @param secretKey the secretKey to set + */ + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + /** + * @return consumer thread nums + */ + public int getConsumeThreadNums() { + return consumeThreadNums; + } + + /** + * @param consumeThreadNums + */ + public void setConsumeThreadNums(int consumeThreadNums) { + this.consumeThreadNums = consumeThreadNums; + } + + public String getConsumerId() { + return consumerId; + } + + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + + public String getProducerId() { + return producerId; + } + + public void setProducerId(String producerId) { + this.producerId = producerId; + } + + public int getSendMsgTimeoutMillis() { + return sendMsgTimeoutMillis; + } + + public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) { + this.sendMsgTimeoutMillis = sendMsgTimeoutMillis; + } + + public String getMqType() { + return mqType; + } + + public void setMqType(String mqType) { + this.mqType = mqType; + } + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java new file mode 100644 index 0000000..4c809c7 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.commons.lang.StringUtils; + +public class JmsBaseConnection implements Connection { + private final AtomicBoolean started = new AtomicBoolean(false); + protected String clientID; + protected ExceptionListener exceptionListener; + protected CommonContext context; + protected JmsBaseSession session; + + public JmsBaseConnection(Map<String, String> connectionParams) { + + this.clientID = UUID.randomUUID().toString(); + + context = new CommonContext(); + + //At lease one should be set + context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID)); + context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID)); + + //optional + context.setProvider(connectionParams.get(CommonConstant.PROVIDER)); + + String nameServer = connectionParams.get(CommonConstant.NAMESERVER); + String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS); + String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS); + String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME); + + if (StringUtils.isNotEmpty(nameServer)) { + context.setNameServer(nameServer); + } + if (StringUtils.isNotEmpty(instanceName)) { + context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME)); + } + + if (StringUtils.isNotEmpty(consumerThreadNums)) { + context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums)); + } + if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) { + context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis)); + } + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + + Preconditions.checkArgument(!transacted, "Not support transaction Session !"); + Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode, + "Not support this acknowledge mode: " + acknowledgeMode); + + if (null != this.session) { + return this.session; + } + synchronized (this) { + if (null != this.session) { + return this.session; + } + this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context); + if (isStarted()) { + this.session.start(); + } + return this.session; + } + } + + @Override + public String getClientID() throws JMSException { + return this.clientID; + } + + @Override + public void setClientID(String clientID) throws JMSException { + this.clientID = clientID; + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return new JmsBaseConnectionMetaData(); + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + return this.exceptionListener; + } + + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + this.exceptionListener = listener; + } + + @Override + public void start() throws JMSException { + if (started.compareAndSet(false, true)) { + if (this.session != null) { + this.session.start(); + } + + } + } + + @Override + public void stop() throws JMSException { + //Stop the connection before closing it. + //Do nothing here. + } + + @Override + public void close() throws JMSException { + if (started.compareAndSet(true, false)) { + if (this.session != null) { + this.session.close(); + } + + } + } + + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Whether the connection is started. + * + * @return whether the connection is started. + */ + public boolean isStarted() { + return started.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java new file mode 100644 index 0000000..1b9da06 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.net.URI; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.util.URISpecParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsBaseConnectionFactory implements ConnectionFactory { + + private static Logger logger = LoggerFactory + .getLogger(JmsBaseConnectionFactory.class); + /** + * Synchronization monitor for the shared Connection + */ + private final Object connectionMonitor = new Object(); + /** + * Can be configured in a consistent way without too much URL hacking. + */ + protected URI connectionUri; + /** + * Store connection uri query parameters. + */ + protected Map<String, String> connectionParams; + /** + * Wrapped Connection + */ + protected JmsBaseConnection connection; + + public JmsBaseConnectionFactory() { + + } + + public JmsBaseConnectionFactory(URI connectionUri) { + setConnectionUri(connectionUri); + } + + public void setConnectionUri(URI connectionUri) { + Preconditions.checkNotNull(connectionUri, "Please set URI !"); + this.connectionUri = connectionUri; + this.connectionParams = URISpecParser.parseURI(connectionUri.toString()); + + if (null != connectionParams) { + Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) || + null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !"); + } + + } + + @Override + public Connection createConnection() throws JMSException { + synchronized (this.connectionMonitor) { + if (this.connection == null) { + initConnection(); + } + return this.connection; + } + } + + /** + * Using userName and Password to create a connection + * + * @param userName ignored + * @param password ignored + * @return the new JMS Connection + * @throws JMSException + */ + @Override + public Connection createConnection(String userName, String password) throws JMSException { + logger.debug("Using userName and Password to create a connection."); + return this.createConnection(); + } + + /** + * Initialize the underlying shared Connection. + * <p/> + * Closes and reInitializes the Connection if an underlying Connection is present already. + * + * @throws javax.jms.JMSException if thrown by JMS API methods + */ + protected void initConnection() throws JMSException { + synchronized (this.connectionMonitor) { + if (this.connection != null) { + closeConnection(this.connection); + } + this.connection = doCreateConnection(); + logger.debug("Established shared JMS Connection: {}", this.connection); + } + } + + /** + * Close the given Connection. + * + * @param con the Connection to close + */ + protected void closeConnection(Connection con) { + logger.debug("Closing shared JMS Connection: {}", this.connection); + try { + try { + con.stop(); + } + finally { + con.close(); + } + } + catch (Throwable ex) { + logger.error("Could not close shared JMS Connection.", ex); + } + } + + /** + * Create a JMS Connection + * + * @return the new JMS Connection + * @throws javax.jms.JMSException if thrown by JMS API methods + */ + protected JmsBaseConnection doCreateConnection() throws JMSException { + Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0, + "Connection Parameters can not be null!"); + this.connection = new JmsBaseConnection(this.connectionParams); + + return connection; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java new file mode 100644 index 0000000..ee549aa --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.Properties; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +public class JmsBaseConnectionMetaData implements ConnectionMetaData { + public static final String JMS_VERSION; + public static final int JMS_MAJOR_VERSION; + public static final int JMS_MINOR_VERSION; + + public static final String PROVIDER_VERSION; + public static final int PROVIDER_MAJOR_VERSION; + public static final int PROVIDER_MINOR_VERSION; + + public static final String PROVIDER_NAME = "Apache RocketMQ"; + + public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData(); + + public static InputStream resourceStream; + + static { + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + + String jmsVersion = null; + int jmsMajor = 0; + int jmsMinor = 0; + try { + Package p = Package.getPackage("javax.jms"); + if (p != null) { + jmsVersion = p.getImplementationVersion(); + Matcher m = pattern.matcher(jmsVersion); + if (m.matches()) { + jmsMajor = Integer.parseInt(m.group(1)); + jmsMinor = Integer.parseInt(m.group(2)); + } + } + } + catch (Throwable e) { + } + JMS_VERSION = jmsVersion; + JMS_MAJOR_VERSION = jmsMajor; + JMS_MINOR_VERSION = jmsMinor; + + String providerVersion = null; + int providerMajor = 0; + int providerMinor = 0; + Properties properties = new Properties(); + try { + resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf"); + properties.load(resourceStream); + providerVersion = properties.getProperty("version"); + + Matcher m = pattern.matcher(providerVersion); + if (m.matches()) { + providerMajor = Integer.parseInt(m.group(1)); + providerMinor = Integer.parseInt(m.group(2)); + } + } + catch (IOException e) { + e.printStackTrace(); + } + PROVIDER_VERSION = providerVersion; + PROVIDER_MAJOR_VERSION = providerMajor; + PROVIDER_MINOR_VERSION = providerMinor; + + } + + public String getJMSVersion() throws JMSException { + return JMS_VERSION; + } + + public int getJMSMajorVersion() throws JMSException { + return JMS_MAJOR_VERSION; + } + + public int getJMSMinorVersion() throws JMSException { + return JMS_MINOR_VERSION; + } + + public String getJMSProviderName() throws JMSException { + return PROVIDER_NAME; + } + + public String getProviderVersion() throws JMSException { + return PROVIDER_VERSION; + } + + public int getProviderMajorVersion() throws JMSException { + return PROVIDER_MAJOR_VERSION; + } + + public int getProviderMinorVersion() throws JMSException { + return PROVIDER_MINOR_VERSION; + } + + public Enumeration<?> getJMSXPropertyNames() throws JMSException { + Vector<String> jmxProperties = new Vector<String>(); + jmxProperties.add("jmsXUserId"); + jmxProperties.add("jmsXAppId"); + jmxProperties.add("jmsXGroupID"); + jmxProperties.add("jmsXGroupSeq"); + jmxProperties.add("jmsXState"); + jmxProperties.add("jmsXDeliveryCount"); + jmxProperties.add("jmsXProducerTXID"); + jmxProperties.add("jmsConsumerTXID"); + jmxProperties.add("jmsRecvTimeStamp"); + return jmxProperties.elements(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java new file mode 100644 index 0000000..f0bca28 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +public interface JmsBaseConstant { + //------------------------JMS message header constant--------------------------------- + String JMS_DESTINATION = "jmsDestination"; + String JMS_DELIVERY_MODE = "jmsDeliveryMode"; + String JMS_EXPIRATION = "jmsExpiration"; + String JMS_DELIVERY_TIME = "jmsDeliveryTime"; + String JMS_PRIORITY = "jmsPriority"; + String JMS_MESSAGE_ID = "jmsMessageID"; + String JMS_TIMESTAMP = "jmsTimestamp"; + String JMS_CORRELATION_ID = "jmsCorrelationID"; + String JMS_REPLY_TO = "jmsReplyTo"; + String JMS_TYPE = "jmsType"; + String JMS_REDELIVERED = "jmsRedelivered"; + + //-------------------------JMS defined properties constant---------------------------- + /** + * The identity of the user sending the Send message + */ + String JMS_XUSER_ID = "jmsXUserID"; + /** + * The identity of the application Send sending the message + */ + String JMS_XAPP_ID = "jmsXAppID"; + /** + * The number of message delivery Receive attempts + */ + String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount"; + /** + * The identity of the message group this message is part of + */ + String JMS_XGROUP_ID = "jmsXGroupID"; + /** + * The sequence number of this message within the group; the first message is 1, the second 2,... + */ + String JMS_XGROUP_SEQ = "jmsXGroupSeq"; + /** + * The transaction identifier of the Send transaction within which this message was produced + */ + String JMS_XPRODUCER_TXID = "jmsXProducerTXID"; + /** + * The transaction identifier of the Receive transaction within which this message was consumed + */ + String JMS_XCONSUMER_TXID = "jmsXConsumerTXID"; + + /** + * The time JMS delivered the Receive message to the consumer + */ + String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp"; + /** + * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and + * that these copies exist from the time the original message was sent. Each copyâs state is one of: 1(waiting), + * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided + * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this. + */ + String JMS_XSTATE = "jmsXState"; + + //---------------------------JMS Headers' value constant--------------------------- + /** + * Default time to live + */ + long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000; + + /** + * Default Jms Type + */ + String DEFAULT_JMS_TYPE = "rocketmq"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java new file mode 100644 index 0000000..b62e928 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.MapMaker; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +public class JmsBaseMessageConsumer implements MessageConsumer { + + private static final Object LOCK_OBJECT = new Object(); + //all shared consumers + private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private CommonContext context; + private Destination destination; + private MessageListener messageListener; + + public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext, + JmsBaseConnection connection) throws JMSException { + synchronized (LOCK_OBJECT) { + checkArgs(destination, commonContext); + + if (null == consumerMap.get(context.getConsumerId())) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId()); + if (context.getConsumeThreadNums() > 0) { + consumer.setConsumeThreadMax(context.getConsumeThreadNums()); + consumer.setConsumeThreadMin(context.getConsumeThreadNums()); + } + if (!Strings.isNullOrEmpty(context.getNameServer())) { + consumer.setNamesrvAddr(context.getNameServer()); + } + if (!Strings.isNullOrEmpty(context.getInstanceName())) { + consumer.setInstanceName(context.getInstanceName()); + } + consumer.setConsumeMessageBatchMaxSize(1); + //add subscribe? + RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer); + consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt); + } + + consumerMap.get(context.getConsumerId()).incrementAndGet(); + + //If the connection has been started, start the consumer right now. + //add start status? + RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId()); + if (connection.isStarted()) { + try { + consumerExt.start(); + } + catch (MQClientException mqe) { + JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId()); + jmsException.initCause(mqe); + throw jmsException; + } + } + } + + } + + private void checkArgs(Destination destination, CommonContext context) throws JMSException { + Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!"); + Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); + this.context = context; + this.destination = destination; + } + + @Override + public String getMessageSelector() throws JMSException { + return null; + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return this.messageListener; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt) { + try { + this.messageListener = listener; + String messageTopic = ((JmsBaseTopic) destination).getMessageTopic(); + String messageType = ((JmsBaseTopic) destination).getMessageType(); + rocketmqConsumerExt.subscribe(messageTopic, messageType, listener); + } + catch (MQClientException mqe) { + //add what? + throw new JMSException(mqe.getMessage()); + } + + } + + } + + @Override + public Message receive() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public Message receive(long timeout) throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public Message receiveNoWait() throws JMSException { + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public void close() throws JMSException { + synchronized (LOCK_OBJECT) { + if (closed.compareAndSet(false, true)) { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) { + rocketmqConsumerExt.close(); + consumerMap.remove(context.getConsumerId()); + } + } + } + } + + /** + * Start the consumer to get message from the Broker. + */ + public void startConsumer() throws JMSException { + RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId()); + if (null != rocketmqConsumerExt) { + try { + rocketmqConsumerExt.start(); + } + catch (MQClientException mqe) { + throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed"); + } + } + } + + public Destination getDestination() throws JMSException { + return this.destination; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java new file mode 100644 index 0000000..8dd82f0 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.MapMaker; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentMap; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.jms.domain.message.JmsBaseMessage; +import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; +import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; +import org.apache.rocketmq.jms.domain.message.JmsTextMessage; +import org.apache.rocketmq.jms.util.ExceptionUtil; +import org.apache.rocketmq.jms.util.MessageConverter; +import org.apache.rocketmq.jms.util.MsgConvertUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsBaseMessageProducer implements MessageProducer { + + private static final Object LOCK_OBJECT = new Object(); + private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap(); + private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class); + private CommonContext context; + + private Destination destination; + + public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException { + synchronized (LOCK_OBJECT) { + checkArgs(destination, context); + + if (null == producerMap.get(this.context.getProducerId())) { + DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId()); + if (!Strings.isNullOrEmpty(context.getNameServer())) { + producer.setNamesrvAddr(context.getNameServer()); + } + if (!Strings.isNullOrEmpty(context.getInstanceName())) { + producer.setInstanceName(context.getInstanceName()); + } + if (context.getSendMsgTimeoutMillis() > 0) { + producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis()); + } + try { + producer.start(); + } + catch (MQClientException mqe) { + throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId())); + } + producerMap.putIfAbsent(this.context.getProducerId(), producer); + } + + } + } + + private void checkArgs(Destination destination, CommonContext context) throws JMSException { + Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!"); + Preconditions.checkNotNull(destination.toString(), "Destination can not be null!"); + this.context = context; + this.destination = destination; + } + + @Override + public boolean getDisableMessageID() throws JMSException { + return false; + } + + @Override + public void setDisableMessageID(boolean value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public boolean getDisableMessageTimestamp() throws JMSException { + return false; + } + + @Override + public void setDisableMessageTimestamp(boolean value) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public int getDeliveryMode() throws JMSException { + return javax.jms.Message.DEFAULT_DELIVERY_MODE; + } + + @Override + public void setDeliveryMode(int deliveryMode) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public int getPriority() throws JMSException { + return javax.jms.Message.DEFAULT_PRIORITY; + } + + @Override + public void setPriority(int defaultPriority) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public long getTimeToLive() throws JMSException { + return JmsBaseConstant.DEFAULT_TIME_TO_LIVE; + } + + @Override + public void setTimeToLive(long timeToLive) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public Destination getDestination() throws JMSException { + return this.destination; + } + + @Override + public void close() throws JMSException { + //Nothing to do + } + + @Override + public void send(javax.jms.Message message) throws JMSException { + this.send(getDestination(), message); + } + + /** + * Send the message to the defined Destination success---return normally. Exception---throw out JMSException. + * + * @param destination see <CODE>Destination</CODE> + * @param message the message to be sent. + * @throws javax.jms.JMSException + */ + @Override + public void send(Destination destination, javax.jms.Message message) throws JMSException { + JmsBaseMessage jmsMsg = (JmsBaseMessage) message; + initJMSHeaders(jmsMsg, destination); + + try { + if (context == null) { + throw new IllegalStateException("Context should be inited"); + } + org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg); + + MQProducer producer = producerMap.get(context.getProducerId()); + + if (producer == null) { + throw new Exception("producer is null "); + } + SendResult sendResult = producer.send(rocketmqMsg); + if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { + jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId()); + } else { + throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString())); + } + } + catch (Exception e) { + logger.error("Send rocketmq message failure !", e); + //if fail to send the message, throw out JMSException + JMSException jmsException = new JMSException("Send rocketmq message failure!"); + jmsException.setLinkedException(e); + throw jmsException; + } + } + + @Override + public void send(javax.jms.Message message, int deliveryMode, int priority, + long timeToLive) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void send(Destination destination, javax.jms.Message message, int deliveryMode, + int priority, long timeToLive) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Init the JmsMessage Headers. + * <p/> + * <P>JMS providers init message's headers. Do not allow user to set these by yourself. + * + * @param jmsMsg message + * @param destination + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException { + + //JMS_DESTINATION default:"topic:message" + jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination); + //JMS_DELIVERY_MODE default : PERSISTENT + jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); + //JMS_TIMESTAMP default : current time + jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis()); + //JMS_EXPIRATION default : 3 days + //JMS_EXPIRATION = currentTime + time_to_live + jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE); + //JMS_PRIORITY default : 4 + jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); + //JMS_TYPE default : open notification service + jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE); + //JMS_REPLY_TO,JMS_CORRELATION_ID default : null + //JMS_MESSAGE_ID is set by sendResult. + //JMS_REDELIVERED is set by broker. + } + + /** + * Init the OnsMessage Headers. + * <p/> + * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the + * OnsMessage's Properties. + * + * @param jmsMsg message + * @throws javax.jms.JMSException + */ + public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg, + String topic, String messageType) throws JMSException { + Properties userProperties = new Properties(); + + //Jms userProperties to properties + Map<String, Object> userProps = jmsMsg.getProperties(); + Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator(); + while (userPropsIter.hasNext()) { + Map.Entry<String, Object> entry = userPropsIter.next(); + userProperties.setProperty(entry.getKey(), entry.getValue().toString()); + } + //Jms systemProperties to ROCKETMQ properties + Map<String, Object> sysProps = jmsMsg.getHeaders(); + Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator(); + while (sysPropsIter.hasNext()) { + Map.Entry<String, Object> entry = sysPropsIter.next(); + userProperties.setProperty(entry.getKey(), entry.getValue().toString()); + } + + //Jms message Model + if (jmsMsg instanceof JmsBytesMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES); + } + else if (jmsMsg instanceof JmsObjectMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ); + } + else if (jmsMsg instanceof JmsTextMessage) { + userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT); + } + + //message topic and tag + userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic); + userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType); + + return userProperties; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java new file mode 100644 index 0000000..5bf7005 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.rocketmq.jms.domain.message.JmsBytesMessage; +import org.apache.rocketmq.jms.domain.message.JmsObjectMessage; +import org.apache.rocketmq.jms.domain.message.JmsTextMessage; +import org.apache.rocketmq.jms.util.ExceptionUtil; + +public class JmsBaseSession implements Session { + protected CommonContext context; + protected JmsBaseConnection connection; + protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList = + new CopyOnWriteArrayList<JmsBaseMessageConsumer>(); + private boolean transacted = true; + private int acknowledgeMode = AUTO_ACKNOWLEDGE; + + public JmsBaseSession(JmsBaseConnection connection, boolean transacted, + int acknowledgeMode, CommonContext context) { + this.context = context; + this.acknowledgeMode = acknowledgeMode; + this.transacted = transacted; + this.connection = connection; + } + + @Override + public BytesMessage createBytesMessage() throws JMSException { + return new JmsBytesMessage(); + } + + @Override + public MapMessage createMapMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Message createMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + return new JmsObjectMessage(); + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + return new JmsObjectMessage(object); + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public TextMessage createTextMessage() throws JMSException { + return new JmsTextMessage(); + } + + @Override + public TextMessage createTextMessage(String text) throws JMSException { + return new JmsTextMessage(text); + } + + @Override + public boolean getTransacted() throws JMSException { + return this.transacted; + } + + @Override + public int getAcknowledgeMode() { + return this.acknowledgeMode; + } + + @Override + public void commit() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void rollback() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public void close() throws JMSException { + for (JmsBaseMessageConsumer messageConsumer : consumerList) { + messageConsumer.close(); + } + consumerList.clear(); + } + + @Override + public void recover() throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return null; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + ExceptionUtil.handleUnSupportedException(); + } + + @Override + public void run() { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + return new JmsBaseMessageProducer(destination, context); + } + + /** + * Create a MessageConsumer. + * <p/> + * <P>Create a durable consumer to the specified destination + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + JmsBaseMessageConsumer messageConsumer = new + JmsBaseMessageConsumer(destination, this.context, this.connection); + this.consumerList.addIfAbsent(messageConsumer); + return messageConsumer; + } + + /** + * Create a MessageConsumer with messageSelector. + * <p/> + * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @param messageSelector For filtering messages + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) + throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + + } + + /** + * Create a MessageConsumer with messageSelector. + * <p/> + * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject + * messages from localhost. + * + * @param destination Equals to Topic:MessageType in ROCKETMQ + * @param messageSelector For filtering messages + * @param noLocal If true: reject messages from localhost + * @throws javax.jms.JMSException + * @see <CODE>Destination</CODE> + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector, + boolean noLocal) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Queue createQueue(String queueName) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public Topic createTopic(String topicName) throws JMSException { + Preconditions.checkNotNull(topicName); + List<String> msgTuple = Arrays.asList(topicName.split(":")); + + Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2, + "Destination must match messageTopic:messageType !"); + + //If messageType is null, use * instead. + if (1 == msgTuple.size()) { + return new JmsBaseTopic(msgTuple.get(0), "*"); + } + return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)); + } + + /** + * Create a MessageConsumer with durable subscription. + * <p/> + * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable + * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. + * + * @param topic destination + * @throws javax.jms.JMSException + * @see <CODE>Topic</CODE> + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) + throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + /** + * Create a MessageConsumer with durable subscription. + * <p/> + * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable + * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method. + * + * @param topic destination + * @throws javax.jms.JMSException + * @see <CODE>Topic</CODE> + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name, + String messageSelector, + boolean noLocal) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + return new TemporaryQueue() { + public void delete() throws JMSException { + } + + public String getQueueName() throws JMSException { + return UUID.randomUUID().toString(); + } + }; + } + + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + return new TemporaryTopic() { + public void delete() throws JMSException { + } + + public String getTopicName() throws JMSException { + return UUID.randomUUID().toString(); + } + }; + } + + @Override + public void unsubscribe(String name) throws JMSException { + throw new UnsupportedOperationException("Unsupported"); + } + + public void start() throws JMSException { + for (JmsBaseMessageConsumer messageConsumer : consumerList) { + messageConsumer.startConsumer(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java new file mode 100644 index 0000000..b7e2fab --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import javax.jms.JMSException; +import javax.jms.Topic; + +public class JmsBaseTopic implements Topic { + + private String messageTopic; + private String messageType; + + public JmsBaseTopic(String messageTopic, String messageType) { + Preconditions.checkNotNull(messageTopic); + Preconditions.checkNotNull(messageType); + + this.messageTopic = messageTopic; + this.messageType = messageType; + } + + public String getTopicName() throws JMSException { + return this.toString(); + } + + public String toString() { + return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType()); + } + + public String getMessageTopic() { + return messageTopic; + } + + public String getMessageType() { + return messageType; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/fab94406/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java new file mode 100644 index 0000000..7a8a9f7 --- /dev/null +++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.domain; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.util.MessageConverter; + +public class RMQPushConsumerExt { + private final MQPushConsumer consumer; + private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>(); + + private AtomicInteger referenceCount = new AtomicInteger(0); + private AtomicBoolean started = new AtomicBoolean(false); + + public RMQPushConsumerExt(MQPushConsumer consumer) { + this.consumer = consumer; + } + + public MQPushConsumer getConsumer() { + return consumer; + } + + public int incrementAndGet() { + return referenceCount.incrementAndGet(); + } + + public int decrementAndGet() { + return referenceCount.decrementAndGet(); + } + + public int getReferenceCount() { + return referenceCount.get(); + } + public void start() throws MQClientException { + if (consumer == null) { + throw new MQClientException(-1, "consumer is null"); + } + + if (this.started.compareAndSet(false, true)) { + this.consumer.registerMessageListener(new MessageListenerImpl()); + this.consumer.start(); + } + } + + + public void close() { + if (this.started.compareAndSet(true, false)) { + this.consumer.shutdown(); + } + } + + public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException { + if (null == topic) { + throw new MQClientException(-1, "topic is null"); + } + + if (null == listener) { + throw new MQClientException(-1, "listener is null"); + } + + try { + this.subscribeTable.put(topic, listener); + this.consumer.subscribe(topic, subExpression); + } catch (MQClientException e) { + throw new MQClientException("consumer subscribe exception", e); + } + } + + public void unsubscribe(String topic) { + if (null != topic) { + this.consumer.unsubscribe(topic); + } + } + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) { + MessageExt msgRMQ = msgsRMQList.get(0); + javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic()); + if (null == listener) { + throw new RuntimeException("MessageListener is null"); + } + + try { + listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ)); + } + catch (Exception e) { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + + public boolean isStarted() { + return started.get(); + } + + + public boolean isClosed() { + return !isStarted(); + } +}