http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java new file mode 100644 index 0000000..880960f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableListener.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.MessageConsumer; + + +/** + * Internal JmsMessage available listener. + */ +public interface JmsMessageAvailableListener { + + /** + * Called when a Message is available to be received by a client + * + * @param consumer + * the MessageConsumer instance that has message available. + */ + public void onMessageAvailable(MessageConsumer consumer); + +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java new file mode 100644 index 0000000..07cba2a --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -0,0 +1,509 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.util.FifoMessageQueue; +import org.apache.qpid.jms.util.MessageQueue; +import org.apache.qpid.jms.util.PriorityMessageQueue; + +/** + * implementation of a JMS Message Consumer + */ +public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher { + + protected final JmsSession session; + protected final JmsConnection connection; + protected JmsConsumerInfo consumerInfo; + protected final int acknowledgementMode; + protected final AtomicBoolean closed = new AtomicBoolean(); + protected boolean started; + protected MessageListener messageListener; + protected JmsMessageAvailableListener availableListener; + protected final MessageQueue messageQueue; + protected final Lock lock = new ReentrantLock(); + protected final AtomicBoolean suspendedConnection = new AtomicBoolean(); + protected final AtomicBoolean delivered = new AtomicBoolean(); + + /** + * Create a non-durable MessageConsumer + * + * @param consumerId + * @param session + * @param destination + * @param selector + * @param noLocal + * @throws JMSException + */ + protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, + String selector, boolean noLocal) throws JMSException { + this(consumerId, session, destination, null, selector, noLocal); + } + + /** + * Create a MessageConsumer which could be durable. + * + * @param consumerId + * @param session + * @param destination + * @param name + * @param selector + * @param noLocal + * @throws JMSException + */ + protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination, + String name, String selector, boolean noLocal) throws JMSException { + this.session = session; + this.connection = session.getConnection(); + this.acknowledgementMode = session.acknowledgementMode(); + + if (connection.isMessagePrioritySupported()) { + this.messageQueue = new PriorityMessageQueue(); + } else { + this.messageQueue = new FifoMessageQueue(); + } + + JmsPrefetchPolicy policy = this.connection.getPrefetchPolicy(); + + this.consumerInfo = new JmsConsumerInfo(consumerId); + this.consumerInfo.setClientId(connection.getClientID()); + this.consumerInfo.setSelector(selector); + this.consumerInfo.setSubscriptionName(name); + this.consumerInfo.setDestination(destination); + this.consumerInfo.setAcknowledgementMode(acknowledgementMode); + this.consumerInfo.setNoLocal(noLocal); + this.consumerInfo.setBrowser(isBrowser()); + this.consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy)); + + try { + this.consumerInfo = session.getConnection().createResource(consumerInfo); + } catch (JMSException ex) { + throw ex; + } + } + + public void init() throws JMSException { + session.add(this); + try { + session.getConnection().startResource(consumerInfo); + } catch (JMSException ex) { + session.remove(this); + throw ex; + } + } + + /** + * @throws JMSException + * @see javax.jms.MessageConsumer#close() + */ + @Override + public void close() throws JMSException { + if (!closed.get()) { + if (delivered.get() && session.getTransactionContext().isInTransaction()) { + session.getTransactionContext().addSynchronization(new JmsTxSynchronization() { + @Override + public void afterCommit() throws Exception { + doClose(); + } + + @Override + public void afterRollback() throws Exception { + doClose(); + } + }); + } else { + doClose(); + } + } + } + + /** + * Called to initiate shutdown of Producer resources and request that the remote + * peer remove the registered producer. + * + * @throws JMSException + */ + protected void doClose() throws JMSException { + shutdown(); + this.connection.destroyResource(consumerInfo); + } + + /** + * Called to release all producer resources without requiring a destroy request + * to be sent to the remote peer. This is most commonly needed when the parent + * Session is closing. + * + * @throws JMSException + */ + protected void shutdown() throws JMSException { + if (closed.compareAndSet(false, true)) { + this.session.remove(this); + } + } + + /** + * @return a Message or null if closed during the operation + * @throws JMSException + * @see javax.jms.MessageConsumer#receive() + */ + @Override + public Message receive() throws JMSException { + checkClosed(); + checkMessageListener(); + sendPullCommand(0); + + try { + return copy(ack(this.messageQueue.dequeue(-1))); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + /** + * @param timeout + * @return a Message or null + * @throws JMSException + * @see javax.jms.MessageConsumer#receive(long) + */ + @Override + public Message receive(long timeout) throws JMSException { + checkClosed(); + checkMessageListener(); + sendPullCommand(timeout); + + if (timeout > 0) { + try { + return copy(ack(this.messageQueue.dequeue(timeout))); + } catch (InterruptedException e) { + throw JmsExceptionSupport.create(e); + } + } + + return null; + } + + /** + * @return a Message or null + * @throws JMSException + * @see javax.jms.MessageConsumer#receiveNoWait() + */ + @Override + public Message receiveNoWait() throws JMSException { + checkClosed(); + checkMessageListener(); + sendPullCommand(-1); + + return copy(ack(this.messageQueue.dequeueNoWait())); + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed.get()) { + throw new IllegalStateException("The MessageConsumer is closed"); + } + } + + JmsMessage copy(final JmsInboundMessageDispatch envelope) throws JMSException { + if (envelope == null || envelope.getMessage() == null) { + return null; + } + return envelope.getMessage().copy(); + } + + JmsInboundMessageDispatch ack(final JmsInboundMessageDispatch envelope) throws JMSException { + if (envelope != null && envelope.getMessage() != null) { + JmsMessage message = envelope.getMessage(); + if (message.getAcknowledgeCallback() != null || session.isTransacted()) { + // Message has been received by the app.. expand the credit + // window so that we receive more messages. + session.acknowledge(envelope, ACK_TYPE.DELIVERED); + } else { + doAck(envelope); + } + // Tags that we have delivered and can't close if in a TX Session. + delivered.set(true); + } + return envelope; + } + + private void doAck(final JmsInboundMessageDispatch envelope) throws JMSException { + checkClosed(); + try { + session.acknowledge(envelope, ACK_TYPE.CONSUMED); + } catch (JMSException ex) { + session.onException(ex); + throw ex; + } + } + + /** + * Called from the session when a new Message has been dispatched to this Consumer + * from the connection. + * + * @param facade + * the newly arrived message. + */ + @Override + public void onMessage(final JmsInboundMessageDispatch envelope) { + lock.lock(); + try { + if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) { + envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (session.isClosed()) { + throw new javax.jms.IllegalStateException("Session closed."); + } + session.acknowledge(); + return null; + } + }); + } + this.messageQueue.enqueue(envelope); + } finally { + lock.unlock(); + } + + if (this.messageListener != null && this.started) { + session.getExecutor().execute(new Runnable() { + @Override + public void run() { + JmsInboundMessageDispatch envelope; + while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) { + try { + messageListener.onMessage(copy(ack(envelope))); + } catch (Exception e) { + session.getConnection().onException(e); + } + } + } + }); + } else { + if (availableListener != null) { + availableListener.onMessageAvailable(this); + } + } + } + + public void start() { + lock.lock(); + try { + this.started = true; + this.messageQueue.start(); + drainMessageQueueToListener(); + } finally { + lock.unlock(); + } + } + + public void stop() { + lock.lock(); + try { + this.started = false; + this.messageQueue.stop(); + } finally { + lock.unlock(); + } + } + + void drainMessageQueueToListener() { + MessageListener listener = this.messageListener; + if (listener != null) { + if (!this.messageQueue.isEmpty()) { + List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll(); + for (JmsInboundMessageDispatch envelope : drain) { + try { + listener.onMessage(copy(ack(envelope))); + } catch (Exception e) { + session.getConnection().onException(e); + } + } + drain.clear(); + } + } + } + + /** + * @return the id + */ + public JmsConsumerId getConsumerId() { + return this.consumerInfo.getConsumerId(); + } + + /** + * @return the Destination + */ + public JmsDestination getDestination() { + return this.consumerInfo.getDestination(); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + checkClosed(); + return this.messageListener; + } + + /** + * @param listener + * @throws JMSException + * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) + */ + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + checkClosed(); + if (consumerInfo.getPrefetchSize() == 0) { + throw new JMSException("Illegal prefetch size of zero. This setting is not supported" + + "for asynchronous consumers please set a value of at least 1"); + } + this.messageListener = listener; + drainMessageQueueToListener(); + } + + /** + * @return the Message Selector + * @throws JMSException + * @see javax.jms.MessageConsumer#getMessageSelector() + */ + @Override + public String getMessageSelector() throws JMSException { + checkClosed(); + return this.consumerInfo.getSelector(); + } + + /** + * Gets the configured prefetch size for this consumer. + * @return the prefetch size configuration for this consumer. + */ + public int getPrefetchSize() { + return this.consumerInfo.getPrefetchSize(); + } + + protected void checkMessageListener() throws JMSException { + session.checkMessageListener(); + } + + boolean hasMessageListener() { + return this.messageListener != null; + } + + boolean isUsingDestination(JmsDestination destination) { + return this.consumerInfo.getDestination().equals(destination); + } + + protected int getMessageQueueSize() { + return this.messageQueue.size(); + } + + public boolean getNoLocal() throws IllegalStateException { + return this.consumerInfo.isNoLocal(); + } + + public boolean isDurableSubscription() { + return false; + } + + public boolean isBrowser() { + return false; + } + + @Override + public void setAvailableListener(JmsMessageAvailableListener availableListener) { + this.availableListener = availableListener; + } + + @Override + public JmsMessageAvailableListener getAvailableListener() { + return availableListener; + } + + protected void onConnectionInterrupted() { + messageQueue.clear(); + } + + protected void onConnectionRecovery(Provider provider) throws Exception { + ProviderFuture request = new ProviderFuture(); + provider.create(consumerInfo, request); + request.sync(); + } + + protected void onConnectionRecovered(Provider provider) throws Exception { + ProviderFuture request = new ProviderFuture(); + provider.start(consumerInfo, request); + request.sync(); + } + + protected void onConnectionRestored() { + } + + /** + * Triggers a pull request from the connected Provider. An attempt is made to set + * a timeout on the pull request however some providers will not honor this value + * and the pull will remain active until a message is dispatched. + * + * The timeout value can be one of: + * + * < 0 to indicate that the request should expire immediately if no message. + * = 0 to indicate that the request should never time out. + * > 1 to indicate that the request should expire after the given time in milliseconds. + * + * @param timeout + * The amount of time the pull request should remain valid. + */ + protected void sendPullCommand(long timeout) throws JMSException { + if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) { + connection.pull(getConsumerId(), timeout); + } + } + + private int getConfiguredPrefetch(JmsDestination destination, JmsPrefetchPolicy policy) { + int prefetch = 0; + if (destination.isTopic()) { + if (isDurableSubscription()) { + prefetch = policy.getDurableTopicPrefetch(); + } else { + prefetch = policy.getTopicPrefetch(); + } + } else { + if (isBrowser()) { + prefetch = policy.getQueueBrowserPrefetch(); + } else { + prefetch = policy.getQueuePrefetch(); + } + } + + return prefetch; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java new file mode 100644 index 0000000..602e8b0 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageDispatcher.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +public interface JmsMessageDispatcher { + + /** + * Called when a new Message delivery is in progress. + * + * @param envelope + * the incoming message dispatch information. + */ + void onMessage(JmsInboundMessageDispatch envelope); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java new file mode 100644 index 0000000..4d09c04 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + +import org.apache.qpid.jms.message.JmsMessageTransformation; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFuture; + +/** + * Implementation of a Jms MessageProducer + */ +public class JmsMessageProducer implements MessageProducer { + + protected final JmsSession session; + protected final JmsConnection connection; + protected JmsProducerInfo producerInfo; + protected final boolean flexibleDestination; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected int priority = Message.DEFAULT_PRIORITY; + protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE; + protected final AtomicBoolean closed = new AtomicBoolean(); + protected boolean disableMessageId; + protected boolean disableTimestamp; + protected final AtomicLong messageSequence = new AtomicLong(); + + protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException { + this.session = session; + this.connection = session.getConnection(); + this.flexibleDestination = destination == null; + this.producerInfo = new JmsProducerInfo(producerId); + this.producerInfo.setDestination(destination); + this.producerInfo = session.getConnection().createResource(producerInfo); + } + + /** + * Close the producer + * + * @throws JMSException + * + * @see javax.jms.MessageProducer#close() + */ + @Override + public void close() throws JMSException { + if (!closed.get()) { + doClose(); + } + } + + /** + * Called to initiate shutdown of Producer resources and request that the remote + * peer remove the registered producer. + * + * @throws JMSException + */ + protected void doClose() throws JMSException { + shutdown(); + this.connection.destroyResource(producerInfo); + } + + /** + * Called to release all producer resources without requiring a destroy request + * to be sent to the remote peer. This is most commonly needed when the parent + * Session is closing. + * + * @throws JMSException + */ + protected void shutdown() throws JMSException { + if (closed.compareAndSet(false, true)) { + this.session.remove(this); + } + } + + /** + * @return the delivery mode + * @throws JMSException + * @see javax.jms.MessageProducer#getDeliveryMode() + */ + @Override + public int getDeliveryMode() throws JMSException { + checkClosed(); + return this.deliveryMode; + } + + /** + * @return the destination + * @throws JMSException + * @see javax.jms.MessageProducer#getDestination() + */ + @Override + public Destination getDestination() throws JMSException { + checkClosed(); + return this.producerInfo.getDestination(); + } + + /** + * @return true if disableIds is set + * @throws JMSException + * @see javax.jms.MessageProducer#getDisableMessageID() + */ + @Override + public boolean getDisableMessageID() throws JMSException { + checkClosed(); + return this.disableMessageId; + } + + /** + * @return true if disable timestamp is set + * @throws JMSException + * @see javax.jms.MessageProducer#getDisableMessageTimestamp() + */ + @Override + public boolean getDisableMessageTimestamp() throws JMSException { + checkClosed(); + return this.disableTimestamp; + } + + /** + * @return the priority + * @throws JMSException + * @see javax.jms.MessageProducer#getPriority() + */ + @Override + public int getPriority() throws JMSException { + checkClosed(); + return this.priority; + } + + /** + * @return timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#getTimeToLive() + */ + @Override + public long getTimeToLive() throws JMSException { + checkClosed(); + return this.timeToLive; + } + + /** + * @param message + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Message) + */ + @Override + public void send(Message message) throws JMSException { + send(producerInfo.getDestination(), message, this.deliveryMode, this.priority, this.timeToLive); + } + + /** + * @param destination + * @param message + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Destination, + * javax.jms.Message) + */ + @Override + public void send(Destination destination, Message message) throws JMSException { + send(destination, message, this.deliveryMode, this.priority, this.timeToLive); + } + + /** + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) + */ + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + send(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive); + } + + /** + * @param destination + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Destination, + * javax.jms.Message, int, int, long) + */ + @Override + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkClosed(); + + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) { + throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName()); + } + + this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp); + } + + /** + * @param deliveryMode + * @throws JMSException + * @see javax.jms.MessageProducer#setDeliveryMode(int) + */ + @Override + public void setDeliveryMode(int deliveryMode) throws JMSException { + checkClosed(); + this.deliveryMode = deliveryMode; + } + + /** + * @param value + * @throws JMSException + * @see javax.jms.MessageProducer#setDisableMessageID(boolean) + */ + @Override + public void setDisableMessageID(boolean value) throws JMSException { + checkClosed(); + this.disableMessageId = value; + } + + /** + * @param value + * @throws JMSException + * @see javax.jms.MessageProducer#setDisableMessageTimestamp(boolean) + */ + @Override + public void setDisableMessageTimestamp(boolean value) throws JMSException { + checkClosed(); + this.disableTimestamp = value; + } + + /** + * @param defaultPriority + * @throws JMSException + * @see javax.jms.MessageProducer#setPriority(int) + */ + @Override + public void setPriority(int defaultPriority) throws JMSException { + checkClosed(); + this.priority = defaultPriority; + } + + /** + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#setTimeToLive(long) + */ + @Override + public void setTimeToLive(long timeToLive) throws JMSException { + checkClosed(); + this.timeToLive = timeToLive; + } + + /** + * @param destination + * the destination to set + * @throws JMSException + * @throws InvalidDestinationException + */ + public void setDestination(Destination destination) throws JMSException { + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + if (!this.flexibleDestination && !destination.equals(producerInfo.getDestination())) { + throw new UnsupportedOperationException("This producer can only send messages to: " + producerInfo.getDestination().getName()); + } + producerInfo.setDestination(JmsMessageTransformation.transformDestination(session.getConnection(), destination)); + } + + /** + * @return the producer's assigned JmsProducerId. + */ + protected JmsProducerId getProducerId() { + return this.producerInfo.getProducerId(); + } + + /** + * @return the next logical sequence for a Message sent from this Producer. + */ + protected long getNextMessageSequence() { + return this.messageSequence.incrementAndGet(); + } + + protected void checkClosed() throws IllegalStateException { + if (closed.get()) { + throw new IllegalStateException("The MessageProducer is closed"); + } + } + + //////////////////////////////////////////////////////////////////////////// + // Connection interruption handlers. + //////////////////////////////////////////////////////////////////////////// + + protected void onConnectionInterrupted() { + } + + protected void onConnectionRecovery(Provider provider) throws Exception { + ProviderFuture request = new ProviderFuture(); + provider.create(producerInfo, request); + request.sync(); + } + + protected void onConnectionRecovered(Provider provider) throws Exception { + } + + protected void onConnectionRestored() { + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java new file mode 100644 index 0000000..c1212f2 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPrefetchPolicy.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.io.Serializable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Defines the prefetch message policies for different types of consumers + */ +public class JmsPrefetchPolicy extends Object implements Serializable { + + private static final long serialVersionUID = 5298685386681646744L; + + public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE; + public static final int DEFAULT_QUEUE_PREFETCH = 1000; + public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; + public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; + public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE; + + private static final Logger LOG = LoggerFactory.getLogger(JmsPrefetchPolicy.class); + + private int queuePrefetch; + private int queueBrowserPrefetch; + private int topicPrefetch; + private int durableTopicPrefetch; + private int maxPrefetchSize = MAX_PREFETCH_SIZE; + + /** + * Initialize default prefetch policies + */ + public JmsPrefetchPolicy() { + this.queuePrefetch = DEFAULT_QUEUE_PREFETCH; + this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH; + this.topicPrefetch = DEFAULT_TOPIC_PREFETCH; + this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH; + } + + /** + * Creates a new JmsPrefetchPolicy instance copied from the source policy. + * + * @param source + * The policy instance to copy values from. + */ + public JmsPrefetchPolicy(JmsPrefetchPolicy source) { + this.queuePrefetch = source.getQueuePrefetch(); + this.queueBrowserPrefetch = source.getQueueBrowserPrefetch(); + this.topicPrefetch = source.getTopicPrefetch(); + this.durableTopicPrefetch = source.getDurableTopicPrefetch(); + } + + /** + * @return Returns the durableTopicPrefetch. + */ + public int getDurableTopicPrefetch() { + return durableTopicPrefetch; + } + + /** + * Sets the durable topic prefetch value, this value is limited by the max + * prefetch size setting. + * + * @param durableTopicPrefetch + * The durableTopicPrefetch to set. + */ + public void setDurableTopicPrefetch(int durableTopicPrefetch) { + this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch); + } + + /** + * @return Returns the queuePrefetch. + */ + public int getQueuePrefetch() { + return queuePrefetch; + } + + /** + * @param queuePrefetch + * The queuePrefetch to set. + */ + public void setQueuePrefetch(int queuePrefetch) { + this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch); + } + + /** + * @return Returns the queueBrowserPrefetch. + */ + public int getQueueBrowserPrefetch() { + return queueBrowserPrefetch; + } + + /** + * @param queueBrowserPrefetch + * The queueBrowserPrefetch to set. + */ + public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { + this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch); + } + + /** + * @return Returns the topicPrefetch. + */ + public int getTopicPrefetch() { + return topicPrefetch; + } + + /** + * @param topicPrefetch + * The topicPrefetch to set. + */ + public void setTopicPrefetch(int topicPrefetch) { + this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); + } + + /** + * Gets the currently configured max prefetch size value. + * @return the currently configured max prefetch value. + */ + public int getMaxPrefetchSize() { + return maxPrefetchSize; + } + + /** + * Sets the maximum prefetch size value. + * + * @param maxPrefetchSize + * The maximum allowed value for any of the prefetch size options. + */ + public void setMaxPrefetchSize(int maxPrefetchSize) { + this.maxPrefetchSize = maxPrefetchSize; + } + + /** + * Sets the prefetch values for all options in this policy to the set limit. If the value + * given is larger than the max prefetch value of this policy the new limit will be capped + * at the max prefetch value. + * + * @param prefetch + * The prefetch value to apply to all prefetch limits. + */ + public void setAll(int prefetch) { + this.durableTopicPrefetch = getMaxPrefetchLimit(prefetch); + this.queueBrowserPrefetch = getMaxPrefetchLimit(prefetch); + this.queuePrefetch = getMaxPrefetchLimit(prefetch); + this.topicPrefetch = getMaxPrefetchLimit(prefetch); + } + + @Override + public boolean equals(Object object) { + if (object instanceof JmsPrefetchPolicy) { + JmsPrefetchPolicy other = (JmsPrefetchPolicy) object; + return this.queuePrefetch == other.queuePrefetch && this.queueBrowserPrefetch == other.queueBrowserPrefetch + && this.topicPrefetch == other.topicPrefetch && this.durableTopicPrefetch == other.durableTopicPrefetch; + } + return false; + } + + private int getMaxPrefetchLimit(int value) { + int result = Math.min(value, maxPrefetchSize); + if (result < value) { + LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java new file mode 100644 index 0000000..d9e397c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueue.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Queue; + +/** + * JMS Queue implementation + */ +public class JmsQueue extends JmsDestination implements Queue { + + public JmsQueue() { + super(null, false, false); + } + + public JmsQueue(String name) { + super(name, false, false); + } + + @Override + public JmsQueue copy() { + final JmsQueue copy = new JmsQueue(); + copy.setProperties(getProperties()); + return copy; + } + + /** + * @return name + * @see javax.jms.Queue#getQueueName() + */ + @Override + public String getQueueName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java new file mode 100644 index 0000000..ce20d42 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.util.Enumeration; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueBrowser; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a queue without + * removing them. + * <p/> + * <p/> + * The <CODE>getEnumeration</CODE> method returns a <CODE> + * java.util.Enumeration</CODE> that is used to scan the queue's messages. It may be an + * enumeration of the entire content of a queue, or it may contain only the messages matching a + * message selector. + * <p/> + * <p/> + * Messages may be arriving and expiring while the scan is done. The JMS API does not require + * the content of an enumeration to be a static snapshot of queue content. Whether these changes + * are visible or not depends on the JMS provider. + * <p/> + * <p/> + * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session + * </CODE> or a <CODE>QueueSession</CODE>. + * + * @see javax.jms.Session#createBrowser + * @see javax.jms.QueueSession#createBrowser + * @see javax.jms.QueueBrowser + * @see javax.jms.QueueReceiver + */ +public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowser.class); + + private final JmsSession session; + private final JmsDestination destination; + private final String selector; + + private JmsMessageConsumer consumer; + private final AtomicBoolean browseDone = new AtomicBoolean(false); + + private Message next; + private final AtomicBoolean closed = new AtomicBoolean(); + private final Object semaphore = new Object(); + + /** + * Constructor for an JmsQueueBrowser - used internally + * + * @param session + * @param id + * @param destination + * @param selector + * @throws javax.jms.JMSException + */ + protected JmsQueueBrowser(JmsSession session, JmsDestination destination, String selector) throws JMSException { + this.session = session; + this.destination = destination; + this.selector = selector; + } + + private void destroyConsumer() { + if (consumer == null) { + return; + } + try { + if (session.getTransacted()) { + session.commit(); + } + consumer.close(); + consumer = null; + } catch (JMSException e) { + e.printStackTrace(); + } + } + + /** + * Gets an enumeration for browsing the current queue messages in the order they would be + * received. + * + * @return an enumeration for browsing the messages + * @throws javax.jms.JMSException + * if the JMS provider fails to get the enumeration for this browser due to some + * internal error. + */ + @Override + public Enumeration<Message> getEnumeration() throws JMSException { + checkClosed(); + if (consumer == null) { + consumer = createConsumer(); + } + return this; + } + + private void checkClosed() throws IllegalStateException { + if (closed.get()) { + throw new IllegalStateException("The Consumer is closed"); + } + } + + /** + * @return true if more messages to process + */ + @Override + public boolean hasMoreElements() { + while (true) { + synchronized (this) { + if (consumer == null) { + return false; + } + } + + if (next == null) { + try { + next = consumer.receiveNoWait(); + } catch (JMSException e) { + LOG.warn("Error while receive the next message: {}", e.getMessage()); + // TODO - Add client internal error listener. + // this.session.connection.onClientInternalException(e); + } + + if (next != null) { + return true; + } + } else { + return true; + } + + if (browseDone.get() || !session.isStarted()) { + destroyConsumer(); + return false; + } + + waitForMessage(); + } + } + + /** + * @return the next message if one exists + * + * @throws NoSuchElementException if no more elements are available. + */ + @Override + public Message nextElement() { + synchronized (this) { + if (consumer == null) { + return null; + } + } + + if (hasMoreElements()) { + Message message = next; + next = null; + return message; + } + + if (browseDone.get() || !session.isStarted()) { + destroyConsumer(); + return null; + } + + throw new NoSuchElementException(); + } + + @Override + public void close() throws JMSException { + if (closed.compareAndSet(false, true)) { + browseDone.set(true); + destroyConsumer(); + } + } + + /** + * Gets the queue associated with this queue browser. + * + * @return the queue + * @throws javax.jms.JMSException + * if the JMS provider fails to get the queue associated with this browser due to + * some internal error. + */ + + @Override + public Queue getQueue() throws JMSException { + return (Queue) destination; + } + + @Override + public String getMessageSelector() throws JMSException { + return selector; + } + + /** + * Wait on a semaphore for a fixed amount of time for a message to come in. + */ + protected void waitForMessage() { + try { + synchronized (semaphore) { + semaphore.wait(2000); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + protected void notifyMessageAvailable() { + synchronized (semaphore) { + semaphore.notifyAll(); + } + } + + @Override + public String toString() { + JmsMessageConsumer consumer = this.consumer; + return "JmsQueueBrowser { value=" + (consumer != null ? consumer.getConsumerId() : "null") + " }"; + } + + private JmsMessageConsumer createConsumer() throws JMSException { + browseDone.set(false); + JmsMessageConsumer rc = new JmsMessageConsumer(session.getNextConsumerId(), session, destination, selector, false) { + + @Override + public boolean isBrowser() { + return true; + } + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + if (envelope.getMessage() == null) { + browseDone.set(true); + } else { + super.onMessage(envelope); + } + notifyMessageAvailable(); + } + }; + rc.init(); + return rc; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java new file mode 100644 index 0000000..39eadeb --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.ConnectionConsumer; +import javax.jms.JMSException; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.util.IdGenerator; + +public class JmsQueueConnection extends JmsConnection { + + public JmsQueueConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException { + super(connectionId, provider, clientIdGenerator); + } + + @Override + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { + throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection"); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection"); + } + + @Override + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + throw new javax.jms.IllegalStateException("Operation not supported by a QueueConnection"); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java new file mode 100644 index 0000000..aa1da65 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueReceiver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueReceiver; + +import org.apache.qpid.jms.meta.JmsConsumerId; + +/** + * Implementation of a JMS QueueReceiver + */ +public class JmsQueueReceiver extends JmsMessageConsumer implements QueueReceiver { + + /** + * Constructor + * + * @param id + * This receiver's assigned Id. + * @param session + * The session that created this receiver. + * @param dest + * The destination that this receiver listens on. + * @param selector + * The selector used to filter messages for this receiver. + * + * @throws JMSException + */ + protected JmsQueueReceiver(JmsConsumerId id, JmsSession session, JmsDestination dest, String selector) throws JMSException { + super(id, session, dest, selector, false); + } + + /** + * @return the Queue + * @throws IllegalStateException + * @see javax.jms.QueueReceiver#getQueue() + */ + @Override + public Queue getQueue() throws IllegalStateException { + checkClosed(); + return (Queue) this.getDestination(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java new file mode 100644 index 0000000..c2276c8 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSender.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueSender; + +import org.apache.qpid.jms.meta.JmsProducerId; + +/** + * Implementation of a Queue Sender + */ +public class JmsQueueSender extends JmsMessageProducer implements QueueSender { + + /** + * Constructor + * + * @param id + * @param session + * @param destination + */ + protected JmsQueueSender(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException { + super(id, session, destination); + } + + /** + * @return the Queue + * @throws IllegalStateException + * @see javax.jms.QueueSender#getQueue() + */ + @Override + public Queue getQueue() throws IllegalStateException { + checkClosed(); + return (Queue) this.producerInfo.getDestination(); + } + + /** + * @param queue + * @param message + * @throws JMSException + * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message) + */ + @Override + public void send(Queue queue, Message message) throws JMSException { + super.send(queue, message); + } + + /** + * @param queue + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message, int, int, long) + */ + @Override + public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + super.send(message, deliveryMode, priority, timeToLive); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java new file mode 100644 index 0000000..274c0a7 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueSession.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSubscriber; + +import org.apache.qpid.jms.meta.JmsSessionId; + +/** + * JMS QueueSession implementation + */ +public class JmsQueueSession extends JmsSession { + + protected JmsQueueSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { + super(connection, sessionId, acknowledgementMode); + } + + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createConsumer(destination); + } + + /** + * @param destination + * @param messageSelector + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, + * java.lang.String) + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createConsumer(destination, messageSelector); + } + + /** + * @param destination + * @param messageSelector + * @param NoLocal + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, + * java.lang.String, boolean) + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param name + * @return + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, + * java.lang.String) + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param name + * @param messageSelector + * @param noLocal + * @return + * @throws IllegalStateException + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, + * java.lang.String, java.lang.String, boolean) + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws IllegalStateException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param destination + * @return + * @throws JMSException + * @see javax.jms.Session#createProducer(javax.jms.Destination) + */ + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createProducer(destination); + } + + /** + * @return + * @throws JMSException + * @see javax.jms.Session#createTemporaryTopic() + */ + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topicName + * @return + * @throws JMSException + * @see javax.jms.Session#createTopic(java.lang.String) + */ + @Override + public Topic createTopic(String topicName) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param name + * @throws JMSException + * @see javax.jms.Session#unsubscribe(java.lang.String) + */ + @Override + public void unsubscribe(String name) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic) + */ + @Override + public TopicPublisher createPublisher(Topic topic) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic) + */ + @Override + public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param messageSelector + * @param noLocal + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, + * java.lang.String, boolean) + */ + @Override + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
