http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java new file mode 100644 index 0000000..1043f5d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.jms.exception.MessageExpiredException; +import org.apache.rocketmq.jms.hook.ReceiveMessageHook; +import org.apache.rocketmq.jms.msg.convert.RMQ2JMSMessageConvert; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.String.format; + +/** + * Service deliver messages synchronous or asynchronous. + */ +public class DeliverMessageService extends ServiceThread { + + private static final Logger log = LoggerFactory.getLogger(DeliverMessageService.class); + private static final AtomicLong COUNTER = new AtomicLong(0L); + private static final int PULL_BATCH_SIZE = 100; + + private RocketMQConsumer consumer; + private DefaultMQPullConsumer rocketMQPullConsumer; + private Destination destination; + private String consumerGroup; + private String topicName; + private ConsumeModel consumeModel = ConsumeModel.SYNC; + + /** only support RMQ subExpression */ + private String messageSelector; + private ReceiveMessageHook hook = new ReceiveMessageHook(); + + /** + * If durable is true, consume message from the offset consumed last time. + * Otherwise consume from the max offset + */ + private boolean durable = false; + private boolean shared = false; + + private BlockingQueue<MessageWrapper> msgQueue = new ArrayBlockingQueue(PULL_BATCH_SIZE); + private volatile boolean pause = true; + private final long index = COUNTER.incrementAndGet(); + + private Map<MessageQueue, Long> offsetMap = new HashMap(); + + public DeliverMessageService(RocketMQConsumer consumer, Destination destination, String consumerGroup, + String messageSelector, boolean durable, boolean shared) { + this.consumer = consumer; + this.destination = destination; + this.consumerGroup = consumerGroup; + this.messageSelector = messageSelector; + this.durable = durable; + this.shared = shared; + + this.topicName = JMSUtils.getDestinationName(destination); + + createAndStartRocketMQPullConsumer(); + + if (this.consumer.getSession().getConnection().isStarted()) { + this.recover(); + } + else { + this.pause(); + } + } + + private void createAndStartRocketMQPullConsumer() { + final ClientConfig clientConfig = this.consumer.getSession().getConnection().getClientConfig(); + this.rocketMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); + this.rocketMQPullConsumer.setNamesrvAddr(clientConfig.getNamesrvAddr()); + this.rocketMQPullConsumer.setInstanceName(clientConfig.getInstanceName()); + this.rocketMQPullConsumer.setRegisterTopics(new HashSet(Arrays.asList(this.topicName))); + + try { + this.rocketMQPullConsumer.start(); + } + catch (MQClientException e) { + throw new JMSRuntimeException(format("Fail to start RocketMQ pull consumer, error msg:%s", ExceptionUtils.getStackTrace(e))); + } + } + + @Override + public String getServiceName() { + return DeliverMessageService.class.getSimpleName() + "-" + this.index; + } + + @Override + public void run() { + while (!isStopped()) { + if (pause) { + this.waitForRunning(1000); + continue; + } + + try { + pullMessage(); + } + catch (InterruptedException e) { + log.debug("Pulling messages service has been interrupted"); + } + catch (Exception e) { + log.error("Error during pulling messages", e); + } + } + } + + private void pullMessage() throws Exception { + Set<MessageQueue> mqs = getMessageQueues(); + + for (MessageQueue mq : mqs) { + Long offset = offsetMap.get(mq); + if (offset == null) { + offset = beginOffset(mq); + } + PullResult pullResult = this.rocketMQPullConsumer.pullBlockIfNotFound(mq, this.messageSelector, offset, PULL_BATCH_SIZE); + + switch (pullResult.getPullStatus()) { + case FOUND: + List<MessageExt> msgs = pullResult.getMsgFoundList(); + offsetMap.put(mq, pullResult.getMaxOffset()); + for (MessageExt msg : msgs) { + handleMessage(msg, mq); + } + log.debug("Pull {} messages from topic:{},broker:{},queueId:{}", msgs.size(), mq.getTopic(), mq.getBrokerName(), mq.getQueueId()); + break; + case NO_NEW_MSG: + case NO_MATCHED_MSG: + break; + case OFFSET_ILLEGAL: + throw new JMSException("Error during pull message[reason:OFFSET_ILLEGAL]"); + } + } + } + + private Set<MessageQueue> getMessageQueues() throws MQClientException { + Set<MessageQueue> mqs = this.rocketMQPullConsumer.fetchSubscribeMessageQueues(this.topicName); + return mqs; + } + + /** + * Refer to {@link #durable}. + * + * @param mq message queue + * @return offset + * @throws MQClientException + */ + private Long beginOffset(MessageQueue mq) throws MQClientException { + return this.durable ? this.rocketMQPullConsumer.fetchConsumeOffset(mq, false) : this.rocketMQPullConsumer.maxOffset(mq); + } + + /** + * If {@link #consumeModel} is {@link ConsumeModel#ASYNC}, messages pulled from broker + * are handled in {@link ConsumeMessageService} owned by its session. + * + * If {@link #consumeModel} is {@link ConsumeModel#SYNC}, messages pulled from broker are put + * into a memory blocking queue, waiting for the {@link MessageConsumer#receive()} + * using {@link BlockingQueue#poll()} to handle messages synchronous. + * + * @param msg to handle message + * @throws InterruptedException + * @throws JMSException + */ + private void handleMessage(MessageExt msg, MessageQueue mq) throws InterruptedException, JMSException { + Message jmsMessage = RMQ2JMSMessageConvert.convert(msg); + + try { + hook.before(jmsMessage); + } + catch (MessageExpiredException e) { + log.debug(e.getMessage()); + } + + final MessageWrapper wrapper = new MessageWrapper(jmsMessage, this.consumer, mq, msg.getQueueOffset()); + + switch (this.consumeModel) { + case SYNC: + this.msgQueue.put(wrapper); + break; + case ASYNC: + this.consumer.getSession().getConsumeMessageService().put(wrapper); + break; + default: + throw new JMSException(format("Unsupported consume model[%s]", this.consumeModel)); + } + } + + public void ack(MessageQueue mq, Long offset) throws JMSException { + try { + this.rocketMQPullConsumer.updateConsumeOffset(mq, offset); + } + catch (MQClientException e) { + throw new JMSException(format("Fail to ack offset[mq:%s,offset:%s]", mq, offset)); + } + } + + public MessageWrapper poll() throws JMSException { + try { + return this.msgQueue.take(); + } + catch (InterruptedException e) { + throw new JMSException(e.getMessage()); + } + } + + public MessageWrapper poll(long timeout, TimeUnit timeUnit) throws JMSException { + try { + return this.msgQueue.poll(timeout, timeUnit); + } + catch (InterruptedException e) { + throw new JMSException(e.getMessage()); + } + } + + public void pause() { + this.pause = true; + } + + public void recover() { + this.pause = false; + } + + public void close() { + + this.stop(); + + this.rocketMQPullConsumer.shutdown(); + + this.shutdown(true); + + log.debug("Success to close message delivery service:{}", getServiceName()); + } + + public void setConsumeModel(ConsumeModel consumeModel) { + this.consumeModel = consumeModel; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java b/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java new file mode 100644 index 0000000..e322e5b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/MessageWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import javax.jms.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +public class MessageWrapper { + + private Message message; + private RocketMQConsumer consumer; + private MessageQueue mq; + private long offset; + + public MessageWrapper(Message message, RocketMQConsumer consumer, MessageQueue mq, long offset) { + this.message = message; + this.consumer = consumer; + this.mq = mq; + this.offset = offset; + } + + public Message getMessage() { + return message; + } + + public RocketMQConsumer getConsumer() { + return consumer; + } + + public MessageQueue getMq() { + return mq; + } + + public long getOffset() { + return offset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java new file mode 100644 index 0000000..e32a382 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.String.format; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; +import static javax.jms.Session.SESSION_TRANSACTED; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace; + +public class RocketMQConnection implements Connection { + + private static final Logger log = LoggerFactory.getLogger(RocketMQConnection.class); + + private String clientID; + private ClientConfig clientConfig; + private MQClientInstance clientInstance; + private String userName; + private String password; + + private List<RocketMQSession> sessionList = new ArrayList(); + private AtomicBoolean started = new AtomicBoolean(false); + + public RocketMQConnection(String nameServerAddress, String clientID, String instanceName, String userName, + String password) { + this.clientID = clientID; + this.userName = userName; + this.password = password; + + this.clientConfig = new ClientConfig(); + this.clientConfig.setNamesrvAddr(nameServerAddress); + this.clientConfig.setInstanceName(instanceName); + + startClientInstance(); + } + + private void startClientInstance() { + try { + // create a tcp connection to broker and some other background thread + this.clientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.clientConfig); + clientInstance.start(); + } + catch (MQClientException e) { + throw new JMSRuntimeException(format("Fail to startClientInstance connection object[namesrvAddr:%s,instanceName:%s]. Error message:%s", + this.clientConfig.getNamesrvAddr(), this.clientConfig.getInstanceName(), getStackTrace(e))); + } + } + + @Override + public Session createSession() throws JMSException { + return createSession(false, AUTO_ACKNOWLEDGE); + } + + @Override + public Session createSession(int sessionMode) throws JMSException { + if (sessionMode == SESSION_TRANSACTED) { + return createSession(true, Session.AUTO_ACKNOWLEDGE); + } + else { + return createSession(false, sessionMode); + } + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + //todo: support transacted and more acknowledge mode + if (transacted) { + throw new JMSException("Not support local transaction session"); + } + if (acknowledgeMode != AUTO_ACKNOWLEDGE) { + throw new JMSException("Only support AUTO_ACKNOWLEDGE mode now"); + } + + RocketMQSession session = new RocketMQSession(this, acknowledgeMode, transacted); + this.sessionList.add(session); + + return session; + } + + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public String getClientID() throws JMSException { + return this.clientID; + } + + @Override + public void setClientID(String clientID) throws JMSException { + if (isNotBlank(this.clientID)) { + throw new IllegalStateException("administratively client identifier has been configured."); + } + this.clientID = clientID; + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return RocketMQConnectionMetaData.instance(); + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void start() throws JMSException { + if (this.started.compareAndSet(false, true)) { + for (RocketMQSession session : sessionList) { + for (RocketMQConsumer consumer : session.getConsumerList()) { + consumer.getDeliverMessageService().recover(); + } + } + log.debug("Start connection successfully:{}", toString()); + } + } + + @Override + public void stop() throws JMSException { + if (this.started.compareAndSet(true, false)) { + for (RocketMQSession session : sessionList) { + for (RocketMQConsumer consumer : session.getConsumerList()) { + consumer.getDeliverMessageService().pause(); + } + } + log.debug("Stop connection successfully:{}", toString()); + } + } + + @Override + public void close() throws JMSException { + + for (RocketMQSession session : sessionList) { + session.close(); + } + + this.clientInstance.shutdown(); + + log.info("Success to close connection:{}", toString()); + } + + public boolean isStarted() { + return started.get(); + } + + public ClientConfig getClientConfig() { + return clientConfig; + } + + public String getUserName() { + return userName; + } + + @Override public String toString() { + return new ToStringBuilder(this) + .append("nameServerAddress", this.clientConfig.getNamesrvAddr()) + .append("instanceName", this.clientConfig.getInstanceName()) + .append("clientIdentifier", this.clientID) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java new file mode 100644 index 0000000..c81e8b5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implement of {@link ConnectionFactory} using RocketMQ client. + * + * <P>In RocketMQ, all producers and consumers interactive with broker + * by an {@link MQClientInstance} object, which encapsulates tcp connection, + * schedule task and so on. The best way to control the behavior of producers/consumers + * derived from a connection is to manipulate the {@link MQClientInstance} directly. + * + * <P>However, this object is not easy to access as it is maintained within RocketMQ Client. + * Fortunately another equivalent identifier called "instanceName" is provided. + * The "instanceName" is a one-to-one conception with {@link MQClientInstance} object. + * Just like there is a hash map,"instanceName" is the key and a {@link MQClientInstance} + * object is the value. So the essential keyword passed through all objects created by a + * connection is "instanceName". + */ +public class RocketMQConnectionFactory implements ConnectionFactory { + + private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionFactory.class); + + private String nameServerAddress; + + private String clientId; + + public RocketMQConnectionFactory(String nameServerAddress) { + this.nameServerAddress = nameServerAddress; + this.clientId = JMSUtils.uuid(); + } + + public RocketMQConnectionFactory(String nameServerAddress, String clientId) { + this.nameServerAddress = nameServerAddress; + this.clientId = clientId; + } + + @Override + public Connection createConnection() throws JMSException { + return createConnection(null, null); + } + + /** + * Using userName and Password to register a connection. Now access RMQ broker + * is anonymous and any userName/password is legal. + * + * @param userName ignored + * @param password ignored + * @return the new JMS Connection + * @throws JMSException + */ + @Override + public Connection createConnection(String userName, String password) throws JMSException { + return createRocketMQConnection(userName, password); + } + + private Connection createRocketMQConnection(String userName, String password) throws JMSException { + final String instanceName = JMSUtils.uuid(); + RocketMQConnection connection = new RocketMQConnection(this.nameServerAddress, this.clientId, instanceName, userName, password); + + log.info("Create a connection successfully[instanceName:{},clientIdentifier:{},userName:{}", instanceName, clientId, userName); + return connection; + } + + @Override + public JMSContext createContext() { + //todo: + return null; + } + + @Override + public JMSContext createContext(String userName, String password) { + //todo: + return null; + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) { + //todo: + return null; + } + + @Override + public JMSContext createContext(int sessionMode) { + //todo: + return null; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getNameServerAddress() { + return nameServerAddress; + } + + public void setNameServerAddress(String nameServerAddress) { + this.nameServerAddress = nameServerAddress; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java new file mode 100644 index 0000000..e4353e1 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConnectionMetaData.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.Enumeration; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.apache.rocketmq.jms.support.ProviderVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketMQConnectionMetaData implements ConnectionMetaData { + + private static final Logger log = LoggerFactory.getLogger(RocketMQConnectionMetaData.class); + private static final String PROVIDER_NAME = "Apache RocketMQ"; + + private String jmsVersion; + private int jmsMajorVersion; + private int jmsMinorVersion; + + private String providerVersion; + private int providerMajorVersion; + private int providerMinorVersion; + + private static RocketMQConnectionMetaData metaData = new RocketMQConnectionMetaData(); + + private RocketMQConnectionMetaData() { + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + + String jmsVersion = null; + int jmsMajor = 0; + int jmsMinor = 0; + try { + Package p = Package.getPackage("javax.jms"); + if (p != null) { + jmsVersion = p.getImplementationVersion(); + Matcher m = pattern.matcher(jmsVersion); + if (m.matches()) { + jmsMajor = Integer.parseInt(m.group(1)); + jmsMinor = Integer.parseInt(m.group(2)); + } + } + } + catch (Throwable e) { + log.error("Error during getting jms version", e); + } + + this.jmsVersion = jmsVersion; + this.jmsMajorVersion = jmsMajor; + this.jmsMinorVersion = jmsMinor; + + this.providerVersion = ProviderVersion.CURRENT_VERSION.name(); + this.providerMinorVersion = ProviderVersion.CURRENT_VERSION.getValue(); + this.providerMajorVersion = ProviderVersion.CURRENT_VERSION.getValue(); + } + + public static RocketMQConnectionMetaData instance() { + return metaData; + } + + public String getJMSVersion() throws JMSException { + return jmsVersion; + } + + public int getJMSMajorVersion() throws JMSException { + return jmsMajorVersion; + } + + public int getJMSMinorVersion() throws JMSException { + return jmsMinorVersion; + } + + public String getJMSProviderName() throws JMSException { + return PROVIDER_NAME; + } + + public String getProviderVersion() throws JMSException { + return providerVersion; + } + + public int getProviderMajorVersion() throws JMSException { + return providerMajorVersion; + } + + public int getProviderMinorVersion() throws JMSException { + return providerMinorVersion; + } + + public Enumeration<?> getJMSXPropertyNames() throws JMSException { + Vector<String> result = new Vector<String>(); + for (JMSPropertiesEnum e : JMSPropertiesEnum.values()) { + result.add(e.name()); + } + return result.elements(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java b/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java new file mode 100644 index 0000000..af147e0 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQConsumer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketMQConsumer implements MessageConsumer { + + private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class); + private RocketMQSession session; + private Destination destination; + private String messageSelector; + private MessageListener messageListener; + private String subscriptionName; + private boolean durable; + private boolean shared; + + private DeliverMessageService deliverMessageService; + + public RocketMQConsumer(RocketMQSession session, Destination destination, + String messageSelector, + boolean durable, boolean shared) { + this(session, destination, messageSelector, UUID.randomUUID().toString(), durable, shared); + } + + public RocketMQConsumer(RocketMQSession session, Destination destination, + String messageSelector, + String subscriptionName, boolean durable, boolean shared) { + this.session = session; + this.destination = destination; + this.messageSelector = messageSelector; + this.subscriptionName = subscriptionName; + this.durable = durable; + this.shared = shared; + + String consumerGroup = JMSUtils.getConsumerGroup(this); + this.deliverMessageService = new DeliverMessageService(this, this.destination, consumerGroup, + this.messageSelector, this.durable, this.shared); + this.deliverMessageService.start(); + } + + @Override + public String getMessageSelector() throws JMSException { + return messageSelector; + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return this.messageListener; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + if (this.session.isSyncModel()) { + throw new JMSException("A asynchronously call is not permitted when a session is being used synchronously"); + } + + this.messageListener = listener; + this.deliverMessageService.setConsumeModel(ConsumeModel.ASYNC); + this.session.addAsyncConsumer(this); + } + + @Override + public Message receive() throws JMSException { + return this.receive(0); + } + + @Override + public Message receive(long timeout) throws JMSException { + if (this.session.isAsyncModel()) { + throw new JMSException("A synchronous call is not permitted when a session is being used asynchronously."); + } + + this.session.addSyncConsumer(this); + + if (timeout == 0) { + MessageWrapper wrapper = this.deliverMessageService.poll(); + wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); + return wrapper.getMessage(); + } + else { + MessageWrapper wrapper = this.deliverMessageService.poll(timeout, TimeUnit.MILLISECONDS); + if (wrapper == null) { + return null; + } + wrapper.getConsumer().getDeliverMessageService().ack(wrapper.getMq(), wrapper.getOffset()); + return wrapper.getMessage(); + } + } + + @Override + public Message receiveNoWait() throws JMSException { + return receive(1); + } + + @Override + public void close() throws JMSException { + this.deliverMessageService.close(); + } + + public void start() { + this.deliverMessageService.recover(); + } + + public void stop() { + this.deliverMessageService.pause(); + } + + public DeliverMessageService getDeliverMessageService() { + return deliverMessageService; + } + + public RocketMQSession getSession() { + return session; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public boolean isDurable() { + return durable; + } + + public boolean isShared() { + return shared; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java b/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java new file mode 100644 index 0000000..f5aa2d4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import java.util.UUID; +import javax.jms.CompletionListener; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; +import org.apache.rocketmq.jms.hook.SendMessageHook; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.String.format; +import static org.apache.commons.lang.exception.ExceptionUtils.getStackTrace; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.support.ObjectTypeCast.cast2Object; + +public class RocketMQProducer implements MessageProducer { + + private static final Logger log = LoggerFactory.getLogger(RocketMQProducer.class); + private RocketMQSession session; + private DefaultMQProducer rocketMQProducer; + private Destination destination; + + private boolean disableMessageID; + private boolean disableMessageTimestamp; + private long timeToLive = JMS_TIME_TO_LIVE_DEFAULT_VALUE; + private int deliveryMode = JMS_DELIVERY_MODE_DEFAULT_VALUE; + private int priority = JMS_PRIORITY_DEFAULT_VALUE; + private long deliveryDelay = JMS_DELIVERY_TIME_DEFAULT_VALUE; + + private SendMessageHook sendMessageHook; + + public RocketMQProducer() { + } + + public RocketMQProducer(RocketMQSession session, Destination destination) { + this.session = session; + this.destination = destination; + + this.rocketMQProducer = new DefaultMQProducer(UUID.randomUUID().toString()); + ClientConfig clientConfig = this.session.getConnection().getClientConfig(); + this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr()); + this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName()); + try { + this.rocketMQProducer.start(); + } + catch (MQClientException e) { + throw new JMSRuntimeException(format("Fail to start producer, error msg:%s", getStackTrace(e))); + } + + this.sendMessageHook = new SendMessageHook(this); + } + + @Override + public void setDisableMessageID(boolean value) throws JMSException { + this.disableMessageID = value; + } + + @Override + public boolean getDisableMessageID() throws JMSException { + return this.disableMessageID; + } + + @Override + public void setDisableMessageTimestamp(boolean value) throws JMSException { + this.disableMessageTimestamp = value; + } + + @Override + public boolean getDisableMessageTimestamp() throws JMSException { + return this.disableMessageTimestamp; + } + + @Override + public void setDeliveryMode(int deliveryMode) throws JMSException { + throw new UnsupportDeliveryModelException(); + } + + @Override + public int getDeliveryMode() throws JMSException { + return this.deliveryMode; + } + + @Override + public void setPriority(int priority) throws JMSException { + this.priority = priority; + } + + @Override + public int getPriority() throws JMSException { + return this.priority; + } + + @Override + public void setTimeToLive(long timeToLive) throws JMSException { + this.timeToLive = timeToLive; + } + + @Override + public long getTimeToLive() throws JMSException { + return this.timeToLive; + } + + @Override + public void setDeliveryDelay(long deliveryDelay) throws JMSException { + this.deliveryDelay = deliveryDelay; + } + + @Override + public long getDeliveryDelay() throws JMSException { + return this.deliveryDelay; + } + + @Override + public Destination getDestination() throws JMSException { + return this.destination; + } + + @Override + public void close() throws JMSException { + this.rocketMQProducer.shutdown(); + } + + @Override + public void send(Message message) throws JMSException { + this.send(this.destination, message); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + this.send(this.destination, message, deliveryMode, priority, timeToLive); + } + + @Override + public void send(Destination destination, Message message) throws JMSException { + this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); + } + + @Override + public void send(Destination destination, Message message, int deliveryMode, int priority, + long timeToLive) throws JMSException { + + sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); + + MessageExt rmqMsg = createRocketMQMessage(message); + + SendResult sendResult = sendSync(rmqMsg); + if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { + log.debug("Success to send message[key={}]", rmqMsg.getKeys()); + return; + } + else { + throw new JMSException(format("Sending message error with result status:%s", sendResult.getSendStatus().name())); + } + } + + private SendResult sendSync(org.apache.rocketmq.common.message.Message rmqMsg) throws JMSException { + + try { + return rocketMQProducer.send(rmqMsg); + } + catch (Exception e) { + throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); + } + } + + private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg, + CompletionListener completionListener) throws JMSException { + try { + rocketMQProducer.send(rmqMsg, new SendCompletionListener(completionListener)); + } + catch (Exception e) { + throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); + } + } + + private MessageExt createRocketMQMessage(Message jmsMsg) throws JMSException { + AbstractJMSMessage abstractJMSMessage = cast2Object(jmsMsg, AbstractJMSMessage.class); + try { + return JMS2RMQMessageConvert.convert(abstractJMSMessage); + } + catch (Exception e) { + throw new JMSException(format("Fail to convert to RocketMQ jmsMsg. Error: %s", getStackTrace(e))); + } + } + + @Override + public void send(Message message, CompletionListener completionListener) throws JMSException { + this.send(this.destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive, + CompletionListener completionListener) throws JMSException { + this.send(this.destination, message, deliveryMode, priority, timeToLive, completionListener); + } + + @Override + public void send(Destination destination, Message message, + CompletionListener completionListener) throws JMSException { + this.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), completionListener); + } + + @Override + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + CompletionListener completionListener) throws JMSException { + + sendMessageHook.before(message, destination, deliveryMode, priority, timeToLive); + + MessageExt rmqMsg = createRocketMQMessage(message); + + sendAsync(rmqMsg, completionListener); + } + + public RocketMQSession getSession() { + return session; + } + + public void setSession(RocketMQSession session) { + this.session = session; + } + + public void setRocketMQProducer(DefaultMQProducer rocketMQProducer) { + this.rocketMQProducer = rocketMQProducer; + } + + public void setDestination(Destination destination) { + this.destination = destination; + } + + public void setSendMessageHook(SendMessageHook sendMessageHook) { + this.sendMessageHook = sendMessageHook; + } + + public String getUserName() { + return this.session.getConnection().getUserName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java new file mode 100644 index 0000000..0094c47 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.jms.admin.AdminFactory; +import org.apache.rocketmq.jms.destination.RocketMQQueue; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.jms.Constant.DEFAULT_DURABLE; +import static org.apache.rocketmq.jms.Constant.DEFAULT_NO_LOCAL; +import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR; + +/** + * Implement of {@link Session}. + */ +public class RocketMQSession implements Session { + + private static final Logger log = LoggerFactory.getLogger(RocketMQSession.class); + + private RocketMQConnection connection; + + private int acknowledgeMode; + + private boolean transacted; + + private ConsumeMessageService consumeMessageService; + + private final List<RocketMQProducer> producerList = new ArrayList(); + + private final List<RocketMQConsumer> consumerList = new ArrayList(); + + private final Set<RocketMQConsumer> asyncConsumerSet = new HashSet(); + + private final Set<RocketMQConsumer> syncConsumerSet = new HashSet(); + + public RocketMQSession(RocketMQConnection connection, int acknowledgeMode, boolean transacted) { + this.connection = connection; + this.acknowledgeMode = acknowledgeMode; + this.transacted = transacted; + + this.consumeMessageService = new ConsumeMessageService(this); + this.consumeMessageService.start(); + } + + @Override + public BytesMessage createBytesMessage() throws JMSException { + return new JMSBytesMessage(); + } + + @Override + public MapMessage createMapMessage() throws JMSException { + return new JMSMapMessage(); + } + + @Override + public Message createMessage() throws JMSException { + return new JMSBytesMessage(); + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + return new JMSObjectMessage(); + } + + @Override + public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { + return new JMSObjectMessage(serializable); + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public TextMessage createTextMessage() throws JMSException { + return new JMSTextMessage(); + } + + @Override + public TextMessage createTextMessage(String text) throws JMSException { + return new JMSTextMessage(text); + } + + @Override + public boolean getTransacted() throws JMSException { + return this.transacted; + } + + @Override + public int getAcknowledgeMode() throws JMSException { + return this.acknowledgeMode; + } + + @Override + public void commit() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void rollback() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void close() throws JMSException { + for (RocketMQProducer producer : this.producerList) { + producer.close(); + } + for (RocketMQConsumer consumer : this.consumerList) { + consumer.close(); + } + } + + @Override + public void recover() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void run() { + //todo + throw new JMSRuntimeException("Not support yet"); + } + + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + RocketMQProducer producer = new RocketMQProducer(this, destination); + this.producerList.add(producer); + return producer; + } + + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + return createConsumer(destination, NO_MESSAGE_SELECTOR); + } + + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + return createConsumer(destination, messageSelector, DEFAULT_NO_LOCAL); + } + + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector, + boolean noLocal) throws JMSException { + + // ignore noLocal param as RMQ not support + RocketMQConsumer consumer = new RocketMQConsumer(this, destination, messageSelector, DEFAULT_DURABLE, false); + this.consumerList.add(consumer); + + return consumer; + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException { + return createSharedConsumer(topic, sharedSubscriptionName, NO_MESSAGE_SELECTOR); + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, + String messageSelector) throws JMSException { + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, sharedSubscriptionName, DEFAULT_DURABLE, true); + this.consumerList.add(consumer); + + return consumer; + } + + @Override + public Queue createQueue(String queueName) throws JMSException { + return new RocketMQQueue(queueName); + } + + @Override + public Topic createTopic(String topicName) throws JMSException { + Preconditions.checkNotNull(topicName); + + return new RocketMQTopic(topicName); + } + + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + return createDurableSubscriber(topic, name, NO_MESSAGE_SELECTOR, DEFAULT_NO_LOCAL); + } + + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, + boolean noLocal) throws JMSException { + RocketMQTopicSubscriber subscriber = new RocketMQTopicSubscriber(this, topic, messageSelector, name, true, true); + this.consumerList.add(subscriber); + + return subscriber; + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException { + return createDurableConsumer(topic, name, NO_MESSAGE_SELECTOR, true); + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, + boolean noLocal) throws JMSException { + DefaultMQAdminExt admin = AdminFactory.getAdmin(this.getConnection().getClientConfig().getNamesrvAddr()); + try { + GroupList groupList = admin.queryTopicConsumeByWho(topic.getTopicName()); + if (groupList.getGroupList().contains(JMSUtils.getConsumerGroup(name, this.getConnection().getClientID(), false))) { + throw new DuplicateSubscriptionException("The same subscription( join subscriptionName with clientID) has existed, so couldn't create consumer on them again "); + } + } + catch (InterruptedException | MQBrokerException | RemotingException | MQClientException e) { + throw new JMSException(ExceptionUtils.getStackTrace(e)); + } + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, false); + this.consumerList.add(consumer); + + return consumer; + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException { + return createSharedDurableConsumer(topic, name, NO_MESSAGE_SELECTOR); + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name, + String messageSelector) throws JMSException { + RocketMQConsumer consumer = new RocketMQConsumer(this, topic, messageSelector, name, true, true); + this.consumerList.add(consumer); + + return consumer; + } + + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + @Override + public void unsubscribe(String name) throws JMSException { + //todo + throw new JMSException("Not support yet"); + } + + public List<RocketMQProducer> getProducerList() { + return producerList; + } + + public List<RocketMQConsumer> getConsumerList() { + return consumerList; + } + + public RocketMQConnection getConnection() { + return connection; + } + + public boolean isTransacted() { + return transacted; + } + + public void addSyncConsumer(RocketMQConsumer consumer) { + this.syncConsumerSet.add(consumer); + } + + public void addAsyncConsumer(RocketMQConsumer consumer) { + this.asyncConsumerSet.add(consumer); + } + + public boolean isAsyncModel() { + return !this.asyncConsumerSet.isEmpty(); + } + + public boolean isSyncModel() { + return !this.syncConsumerSet.isEmpty(); + } + + public ConsumeMessageService getConsumeMessageService() { + return consumeMessageService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java b/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java new file mode 100644 index 0000000..51b732b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/RocketMQTopicSubscriber.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import javax.jms.JMSException; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +public class RocketMQTopicSubscriber extends RocketMQConsumer implements TopicSubscriber { + + private Topic topic; + + public RocketMQTopicSubscriber(RocketMQSession session, Topic topic, String messageSelector, + String sharedSubscriptionName, boolean durable, boolean shared) { + super(session, topic, messageSelector, sharedSubscriptionName, durable, shared); + this.topic = topic; + } + + @Override + public Topic getTopic() throws JMSException { + return this.topic; + } + + @Override + public boolean getNoLocal() throws JMSException { + //todo: not inhibit now + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java b/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java new file mode 100644 index 0000000..c2ea161 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/SendCompletionListener.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import javax.jms.CompletionListener; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; + +public class SendCompletionListener implements SendCallback { + + private CompletionListener completionListener; + + public SendCompletionListener(CompletionListener completionListener) { + this.completionListener = completionListener; + } + + @Override + public void onSuccess(SendResult sendResult) { + //todo: how to transmit message into + this.completionListener.onCompletion(null); + } + + @Override + public void onException(Throwable e) { + //todo: how to transmit message into + this.completionListener.onException(null, new Exception(e)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java b/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java new file mode 100644 index 0000000..c551a22 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/admin/AdminFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.admin; + +import java.util.concurrent.ConcurrentHashMap; +import javax.jms.JMSException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.jms.exception.JMSClientException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +public class AdminFactory { + + private static ConcurrentHashMap<String/*nameServerAddress*/, DefaultMQAdminExt> admins = new ConcurrentHashMap(); + + public static DefaultMQAdminExt getAdmin(String nameServerAddress) throws JMSException { + if (nameServerAddress == null) { + throw new IllegalArgumentException("NameServerAddress could be null"); + } + + DefaultMQAdminExt admin = admins.get(nameServerAddress); + if (admin != null) { + return admin; + } + + admin = new DefaultMQAdminExt(nameServerAddress); + try { + admin.start(); + } + catch (MQClientException e) { + throw new JMSClientException("Error during starting admin client"); + } + DefaultMQAdminExt old = admins.putIfAbsent(nameServerAddress, admin); + if (old != null) { + admin.shutdown(); + return old; + } + + return admin; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java new file mode 100644 index 0000000..d7d5b84 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQQueue.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import javax.jms.JMSException; +import javax.jms.Queue; + +public class RocketMQQueue implements Queue { + + private String name; + + public RocketMQQueue(String name) { + this.name = name; + } + + @Override + public String getQueueName() throws JMSException { + return this.name; + } + + @Override public String toString() { + return this.name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java new file mode 100644 index 0000000..3214b4c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/destination/RocketMQTopic.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.destination; + +import javax.jms.JMSException; +import javax.jms.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RocketMQTopic implements Topic { + private static final Logger log = LoggerFactory.getLogger(RocketMQTopic.class); + + private String name; + + public RocketMQTopic(String name) { + this.name = name; + } + + @Override + public String getTopicName() throws JMSException { + return this.name; + } + + @Override + public String toString() { + return this.name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java b/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java new file mode 100644 index 0000000..51ce57d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/exception/DuplicateSubscriptionException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSException; + +public class DuplicateSubscriptionException extends JMSException { + + public DuplicateSubscriptionException(String reason, String errorCode) { + super(reason, errorCode); + } + + public DuplicateSubscriptionException(String reason) { + super(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java b/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java new file mode 100644 index 0000000..1335eb9 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/exception/JMSClientException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSException; + +public class JMSClientException extends JMSException { + + public JMSClientException(String reason, String errorCode) { + super(reason, errorCode); + } + + public JMSClientException(String reason) { + super(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java b/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java new file mode 100644 index 0000000..d35b7b1 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/exception/MessageExpiredException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSException; + +public class MessageExpiredException extends JMSException { + + public MessageExpiredException(String reason, String errorCode) { + super(reason, errorCode); + } + + public MessageExpiredException(String reason) { + super(reason); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java b/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java new file mode 100644 index 0000000..903b75f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/exception/UnsupportDeliveryModelException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.exception; + +import javax.jms.JMSRuntimeException; + +public class UnsupportDeliveryModelException extends JMSRuntimeException { + + public UnsupportDeliveryModelException() { + super("Only support PERSISTENT model, and guarantee at-least-once"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java b/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java new file mode 100644 index 0000000..2bf9bd3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/hook/ReceiveMessageHook.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.jms.exception.MessageExpiredException; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; + +public class ReceiveMessageHook { + + public void before(Message message) throws JMSException { + + validate(message); + + setProviderProperties(message); + } + + private void validate(Message message) throws JMSException { + if (message.getJMSExpiration() != 0 && System.currentTimeMillis() > message.getJMSExpiration()) { + throw new MessageExpiredException(String.format("This message[id=%s] has been expired", message.getJMSMessageID())); + } + } + + public void setProviderProperties(Message message) throws JMSException { + //JMSXRcvTimestamp + message.setLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name(), System.currentTimeMillis()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java b/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java new file mode 100644 index 0000000..0429973 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/jms/hook/SendMessageHook.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import java.util.UUID; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.rocketmq.jms.RocketMQProducer; +import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; + +import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX; +import static org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum.JMSXUserID; + +/** + * Hook that executes before sending message. + */ +public class SendMessageHook { + + private RocketMQProducer producer; + + public SendMessageHook() { + } + + public SendMessageHook(RocketMQProducer producer) { + this.producer = producer; + } + + public void before(Message message, Destination destination, int deliveryMode, int priority, + long timeToLive) throws JMSException { + + validate(deliveryMode); + + setProviderHeader(message, destination, deliveryMode, priority, timeToLive); + + setProviderProperties(message); + } + + private void setProviderHeader(Message message, Destination destination, int deliveryMode, int priority, + long timeToLive) throws JMSException { + // destination + message.setJMSDestination(destination); + + // delivery mode + message.setJMSDeliveryMode(deliveryMode); + + // expiration + if (timeToLive != 0) { + message.setJMSExpiration(System.currentTimeMillis() + timeToLive); + } + else { + message.setJMSExpiration(0L); + } + + // delivery time + message.setJMSDeliveryTime(message.getJMSTimestamp() + this.producer.getDeliveryDelay()); + + // priority + message.setJMSPriority(priority); + + // messageID is also required in async model, so {@link MessageExt#getMsgId()} can't be used. + if (!this.producer.getDisableMessageID()) { + message.setJMSMessageID(new StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().toString()).toString()); + } + + // timestamp + if (!this.producer.getDisableMessageTimestamp()) { + message.setJMSTimestamp(System.currentTimeMillis()); + } + } + + private void validate(int deliveryMode) { + if (deliveryMode != JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE) { + throw new UnsupportDeliveryModelException(); + } + } + + public void setProviderProperties(Message message) throws JMSException { + // JMSXUserID + if (this.producer.getUserName() != null) { + message.setStringProperty(JMSXUserID.name(), this.producer.getUserName()); + } + } + + public void setProducer(RocketMQProducer producer) { + this.producer = producer; + } +}