http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java new file mode 100644 index 0000000..a7ede86 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -0,0 +1,1002 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.apollo.filter.FilterException; +import org.apache.activemq.apollo.selector.SelectorParser; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsMessageTransformation; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsSessionInfo; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; + +/** + * JMS Session implementation + */ +@SuppressWarnings("static-access") +public class JmsSession implements Session, QueueSession, TopicSession, JmsMessageDispatcher { + + private final JmsConnection connection; + private final int acknowledgementMode; + private final List<JmsMessageProducer> producers = new CopyOnWriteArrayList<JmsMessageProducer>(); + private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>(); + private MessageListener messageListener; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); + private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages = + new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000); + private JmsPrefetchPolicy prefetchPolicy; + private JmsSessionInfo sessionInfo; + private ExecutorService executor; + private final ReentrantLock sendLock = new ReentrantLock(); + + private final AtomicLong consumerIdGenerator = new AtomicLong(); + private final AtomicLong producerIdGenerator = new AtomicLong(); + private JmsLocalTransactionContext transactionContext; + private JmsMessageFactory messageFactory; + + protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { + this.connection = connection; + this.acknowledgementMode = acknowledgementMode; + this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy()); + + setTransactionContext(new JmsLocalTransactionContext(this)); + + this.sessionInfo = new JmsSessionInfo(sessionId); + this.sessionInfo.setAcknowledgementMode(acknowledgementMode); + this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync()); + + this.sessionInfo = connection.createResource(sessionInfo); + this.messageFactory = connection.getMessageFactory(); + } + + int acknowledgementMode() { + return this.acknowledgementMode; + } + + ////////////////////////////////////////////////////////////////////////// + // Session methods + ////////////////////////////////////////////////////////////////////////// + + @Override + public int getAcknowledgeMode() throws JMSException { + checkClosed(); + return this.acknowledgementMode; + } + + @Override + public boolean getTransacted() throws JMSException { + checkClosed(); + return isTransacted(); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + checkClosed(); + return this.messageListener; + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + checkClosed(); + this.messageListener = listener; + } + + @Override + public void recover() throws JMSException { + checkClosed(); + if (getTransacted()) { + throw new javax.jms.IllegalStateException("Cannot call recover() on a transacted session"); + } + + this.connection.recover(getSessionId()); + } + + @Override + public void commit() throws JMSException { + checkClosed(); + + if (!getTransacted()) { + throw new javax.jms.IllegalStateException("Not a transacted session"); + } + + this.transactionContext.commit(); + } + + @Override + public void rollback() throws JMSException { + checkClosed(); + if (!getTransacted()) { + throw new javax.jms.IllegalStateException("Not a transacted session"); + } + + this.transactionContext.rollback(); + + getExecutor().execute(new Runnable() { + @Override + public void run() { + for (JmsMessageConsumer c : consumers.values()) { + c.drainMessageQueueToListener(); + } + } + }); + } + + @Override + public void run() { + try { + checkClosed(); + } catch (IllegalStateException e) { + throw new RuntimeException(e); + } + + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws JMSException { + if (!closed.get()) { + doClose(); + } + } + + /** + * Shutdown the Session and release all resources. Once completed the Session can + * request that the Provider destroy the Session and it's child resources. + * + * @throws JMSException + */ + protected void doClose() throws JMSException { + boolean interrupted = Thread.interrupted(); + shutdown(); + this.connection.removeSession(this); + this.connection.destroyResource(sessionInfo); + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + /** + * This method should terminate all Session resources and prepare for disposal of the + * Session. It is called either from the Session close method or from the Connection + * when a close request is made and the Connection wants to cleanup all Session resources. + * + * This method should not attempt to send a destroy request to the Provider as that + * will either be done by another session method or is not needed when done by the parent + * Connection. + * + * @throws JMSException + */ + protected void shutdown() throws JMSException { + if (closed.compareAndSet(false, true)) { + stop(); + for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) { + consumer.shutdown(); + } + + for (JmsMessageProducer producer : this.producers) { + producer.shutdown(); + } + + try { + if (getTransactionContext().isInTransaction()) { + rollback(); + } + } catch (JMSException e) { + } + } + } + + ////////////////////////////////////////////////////////////////////////// + // Consumer creation + ////////////////////////////////////////////////////////////////////////// + + /** + * @param destination + * @return a MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination) + */ + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + return createConsumer(destination, null); + } + + /** + * @param destination + * @param messageSelector + * @return MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, + * java.lang.String) + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + return createConsumer(destination, messageSelector, false); + } + + /** + * @param destination + * @param messageSelector + * @param NoLocal + * @return the MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, + * java.lang.String, boolean) + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { + checkClosed(); + checkDestination(destination); + messageSelector = checkSelector(messageSelector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination); + JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, NoLocal, messageSelector); + result.init(); + return result; + } + + /** + * @param queue + * @return QueueRecevier + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue) + */ + @Override + public QueueReceiver createReceiver(Queue queue) throws JMSException { + checkClosed(); + checkDestination(queue); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue); + JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, ""); + result.init(); + return result; + } + + /** + * @param queue + * @param messageSelector + * @return QueueReceiver + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, + * java.lang.String) + */ + @Override + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { + checkClosed(); + checkDestination(queue); + messageSelector = checkSelector(messageSelector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue); + JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, messageSelector); + result.init(); + return result; + } + + /** + * @param destination + * @return QueueBrowser + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue) + */ + @Override + public QueueBrowser createBrowser(Queue destination) throws JMSException { + return createBrowser(destination, null); + } + + /** + * @param destination + * @param messageSelector + * @return QueueBrowser + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String) + */ + @Override + public QueueBrowser createBrowser(Queue destination, String messageSelector) throws JMSException { + checkClosed(); + checkDestination(destination); + messageSelector = checkSelector(messageSelector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination); + JmsQueueBrowser result = new JmsQueueBrowser(this, dest, messageSelector); + return result; + } + + /** + * @param topic + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic) + */ + @Override + public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + return createSubscriber(topic, null, false); + } + + /** + * @param topic + * @param messageSelector + * @param noLocal + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, + * java.lang.String, boolean) + */ + @Override + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { + checkClosed(); + checkDestination(topic); + messageSelector = checkSelector(messageSelector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic); + JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, noLocal, messageSelector); + result.init(); + return result; + } + + /** + * @param topic + * @param name + * @return a TopicSubscriber + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, + * java.lang.String) + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + return createDurableSubscriber(topic, name, null, false); + } + + /** + * @param topic + * @param name + * @param messageSelector + * @param noLocal + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, + * java.lang.String, java.lang.String, boolean) + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { + checkClosed(); + checkDestination(topic); + messageSelector = checkSelector(messageSelector); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic); + JmsTopicSubscriber result = new JmsDurableTopicSubscriber(getNextConsumerId(), this, dest, name, false, messageSelector); + result.init(); + return result; + } + + /** + * @param name + * @throws JMSException + * @see javax.jms.Session#unsubscribe(java.lang.String) + */ + @Override + public void unsubscribe(String name) throws JMSException { + checkClosed(); + this.connection.unsubscribe(name); + } + + ////////////////////////////////////////////////////////////////////////// + // Producer creation + ////////////////////////////////////////////////////////////////////////// + + /** + * @param destination + * @return MessageProducer + * @throws JMSException + * @see javax.jms.Session#createProducer(javax.jms.Destination) + */ + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + checkClosed(); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination); + JmsMessageProducer result = new JmsMessageProducer(getNextProducerId(), this, dest); + add(result); + return result; + } + + /** + * @param queue + * @return QueueSender + * @throws JMSException + * @see javax.jms.QueueSession#createSender(javax.jms.Queue) + */ + @Override + public QueueSender createSender(Queue queue) throws JMSException { + checkClosed(); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue); + JmsQueueSender result = new JmsQueueSender(getNextProducerId(), this, dest); + return result; + } + + /** + * @param topic + * @return TopicPublisher + * @throws JMSException + * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic) + */ + @Override + public TopicPublisher createPublisher(Topic topic) throws JMSException { + checkClosed(); + JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic); + JmsTopicPublisher result = new JmsTopicPublisher(getNextProducerId(), this, dest); + add(result); + return result; + } + + ////////////////////////////////////////////////////////////////////////// + // Message creation + ////////////////////////////////////////////////////////////////////////// + + @Override + public BytesMessage createBytesMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createBytesMessage()); + } + + @Override + public MapMessage createMapMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createMapMessage()); + } + + @Override + public Message createMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createMessage()); + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createObjectMessage(null)); + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + checkClosed(); + return init(messageFactory.createObjectMessage(object)); + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createStreamMessage()); + } + + @Override + public TextMessage createTextMessage() throws JMSException { + checkClosed(); + return init(messageFactory.createTextMessage(null)); + } + + @Override + public TextMessage createTextMessage(String text) throws JMSException { + checkClosed(); + return init(messageFactory.createTextMessage(text)); + } + + ////////////////////////////////////////////////////////////////////////// + // Destination creation + ////////////////////////////////////////////////////////////////////////// + + /** + * @param queueName + * @return Queue + * @throws JMSException + * @see javax.jms.Session#createQueue(java.lang.String) + */ + @Override + public Queue createQueue(String queueName) throws JMSException { + checkClosed(); + return new JmsQueue(queueName); + } + + /** + * @param topicName + * @return Topic + * @throws JMSException + * @see javax.jms.Session#createTopic(java.lang.String) + */ + @Override + public Topic createTopic(String topicName) throws JMSException { + checkClosed(); + return new JmsTopic(topicName); + } + + /** + * @return TemporaryQueue + * @throws JMSException + * @see javax.jms.Session#createTemporaryQueue() + */ + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + checkClosed(); + return connection.createTemporaryQueue(); + } + + /** + * @return TemporaryTopic + * @throws JMSException + * @see javax.jms.Session#createTemporaryTopic() + */ + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + checkClosed(); + return connection.createTemporaryTopic(); + } + + ////////////////////////////////////////////////////////////////////////// + // Session Implementation methods + ////////////////////////////////////////////////////////////////////////// + + protected void add(JmsMessageConsumer consumer) throws JMSException { + this.consumers.put(consumer.getConsumerId(), consumer); + this.connection.addDispatcher(consumer.getConsumerId(), this); + + if (started.get()) { + consumer.start(); + } + } + + protected void remove(JmsMessageConsumer consumer) throws JMSException { + this.connection.removeDispatcher(consumer.getConsumerId()); + this.consumers.remove(consumer.getConsumerId()); + } + + protected void add(JmsMessageProducer producer) { + this.producers.add(producer); + } + + protected void remove(MessageProducer producer) { + this.producers.remove(producer); + } + + protected void onException(Exception ex) { + this.connection.onException(ex); + } + + protected void onException(JMSException ex) { + this.connection.onException(ex); + } + + protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException { + JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest); + send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp); + } + + private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException { + sendLock.lock(); + try { + startNextTransaction(); + + original.setJMSDeliveryMode(deliveryMode); + original.setJMSPriority(priority); + original.setJMSRedelivered(false); + + long timeStamp = 0; + boolean hasTTL = timeToLive > 0; + if (!disableTimestamp || hasTTL) { + timeStamp = System.currentTimeMillis(); + } + + original.setJMSTimestamp(timeStamp); + + if (hasTTL) { + original.setJMSExpiration(timeStamp + timeToLive); + } + + JmsMessageId msgId = null; + if (!disableMsgId) { + msgId = getNextMessageId(producer); + } + + boolean isJmsMessageType = original instanceof JmsMessage; + if (isJmsMessageType) { + ((JmsMessage) original).setConnection(connection); + if (!disableMsgId) { + ((JmsMessage) original).setJMSMessageID(msgId); + } + original.setJMSDestination(destination); + } + + JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original); + + // Ensure original message gets the destination and message ID as per spec. + if (!isJmsMessageType) { + if (!disableMsgId) { + original.setJMSMessageID(msgId.toString()); + copy.setJMSMessageID(msgId); + } + original.setJMSDestination(destination); + copy.setJMSDestination(destination); + } + + boolean sync = connection.isAlwaysSyncSend() || + (!connection.isForceAsyncSend() && deliveryMode == DeliveryMode.PERSISTENT && !getTransacted()); + + copy.onSend(); + JmsOutboundMessageDispatch envelope = new JmsOutboundMessageDispatch(); + envelope.setMessage(copy); + envelope.setProducerId(producer.getProducerId()); + envelope.setDestination(destination); + envelope.setSendAsync(!sync); + + this.connection.send(envelope); + } finally { + sendLock.unlock(); + } + } + + void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { + startNextTransaction(); + this.connection.acknowledge(envelope, ackType); + } + + /** + * Acknowledge all previously delivered messages in this Session as consumed. This + * method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode. + * + * @throws JMSException if an error occurs while the acknowledge is processed. + */ + void acknowledge() throws JMSException { + this.connection.acknowledge(sessionInfo.getSessionId()); + } + + public boolean isClosed() { + return this.closed.get(); + } + + /** + * Checks whether the session uses transactions. + * + * @return true - if the session uses transactions. + */ + public boolean isTransacted() { + return this.acknowledgementMode == Session.SESSION_TRANSACTED; + } + + /** + * Checks whether the session used client acknowledgment. + * + * @return true - if the session uses client acknowledgment. + */ + protected boolean isClientAcknowledge() { + return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; + } + + /** + * Checks whether the session used auto acknowledgment. + * + * @return true - if the session uses client acknowledgment. + */ + public boolean isAutoAcknowledge() { + return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; + } + + /** + * Checks whether the session used dup ok acknowledgment. + * + * @return true - if the session uses client acknowledgment. + */ + public boolean isDupsOkAcknowledge() { + return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed.get()) { + throw new IllegalStateException("The MessageProducer is closed"); + } + } + + // This extra wrapping class around SelectorParser is used to avoid + // ClassNotFoundException if SelectorParser is not in the class path. + static class OptionalSectorParser { + public static void check(String selector) throws InvalidSelectorException { + try { + SelectorParser.parse(selector); + } catch (FilterException e) { + throw new InvalidSelectorException(e.getMessage()); + } + } + } + + static final OptionalSectorParser SELECTOR_PARSER; + static { + OptionalSectorParser parser; + try { + // lets verify it's working.. + parser = new OptionalSectorParser(); + parser.check("x=1"); + } catch (Throwable e) { + parser = null; + } + SELECTOR_PARSER = parser; + } + + public static String checkSelector(String selector) throws InvalidSelectorException { + if (selector != null) { + if (selector.trim().length() == 0) { + return null; + } + if (SELECTOR_PARSER != null) { + SELECTOR_PARSER.check(selector); + } + } + return selector; + } + + public static void checkDestination(Destination dest) throws InvalidDestinationException { + if (dest == null) { + throw new InvalidDestinationException("Destination cannot be null"); + } + } + + protected void start() throws JMSException { + if (started.compareAndSet(false, true)) { + JmsInboundMessageDispatch message = null; + while ((message = this.stoppedMessages.poll()) != null) { + deliver(message); + } + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.start(); + } + } + } + + protected void stop() throws JMSException { + started.set(false); + if (executor != null) { + executor.shutdown(); + executor = null; + } + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.stop(); + } + } + + protected boolean isStarted() { + return this.started.get(); + } + + public JmsConnection getConnection() { + return this.connection; + } + + Executor getExecutor() { + if (executor == null) { + executor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread executor = new Thread(runner); + executor.setName("JmsSession ["+ sessionInfo.getSessionId() + "] dispatcher"); + executor.setDaemon(true); + return executor; + } + }); + } + return executor; + } + + protected JmsSessionInfo getSessionInfo() { + return this.sessionInfo; + } + + protected JmsSessionId getSessionId() { + return this.sessionInfo.getSessionId(); + } + + protected JmsConsumerId getNextConsumerId() { + return new JmsConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.incrementAndGet()); + } + + protected JmsProducerId getNextProducerId() { + return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet()); + } + + private JmsMessageId getNextMessageId(JmsMessageProducer producer) { + return new JmsMessageId(producer.getProducerId(), producer.getNextMessageSequence()); + } + + private <T extends JmsMessage> T init(T message) { + message.setConnection(connection); + return message; + } + + private synchronized void startNextTransaction() throws JMSException { + if (getTransacted()) { + transactionContext.begin(); + } + } + + boolean isDestinationInUse(JmsDestination destination) { + for (JmsMessageConsumer consumer : consumers.values()) { + if (consumer.isUsingDestination(destination)) { + return true; + } + } + return false; + } + + void checkMessageListener() throws JMSException { + if (messageListener != null) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + for (JmsMessageConsumer consumer : consumers.values()) { + if (consumer.hasMessageListener()) { + throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); + } + } + } + + public JmsPrefetchPolicy getPrefetchPolicy() { + return prefetchPolicy; + } + + public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { + this.prefetchPolicy = prefetchPolicy; + } + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + if (started.get()) { + deliver(envelope); + } else { + this.stoppedMessages.add(envelope); + } + } + + protected void onConnectionInterrupted() { + for (JmsMessageProducer producer : producers) { + producer.onConnectionInterrupted(); + } + + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.onConnectionInterrupted(); + } + } + + protected void onConnectionRecovery(Provider provider) throws Exception { + + ProviderFuture request = new ProviderFuture(); + provider.create(sessionInfo, request); + request.sync(); + + if (this.acknowledgementMode == SESSION_TRANSACTED) { + if (transactionContext.isInTransaction()) { + transactionContext.clear(); + transactionContext.begin(); + } + } + + for (JmsMessageProducer producer : producers) { + producer.onConnectionRecovery(provider); + } + + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.onConnectionRecovery(provider); + } + } + + protected void onConnectionRecovered(Provider provider) throws Exception { + + this.messageFactory = provider.getMessageFactory(); + + for (JmsMessageProducer producer : producers) { + producer.onConnectionRecovered(provider); + } + + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.onConnectionRecovered(provider); + } + } + + protected void onConnectionRestored() { + for (JmsMessageProducer producer : producers) { + producer.onConnectionRestored(); + } + + for (JmsMessageConsumer consumer : consumers.values()) { + consumer.onConnectionRestored(); + } + } + + private void deliver(JmsInboundMessageDispatch envelope) { + JmsConsumerId id = envelope.getConsumerId(); + if (id == null) { + this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage())); + } + if (this.messageListener != null) { + this.messageListener.onMessage(envelope.getMessage()); + } else { + JmsMessageConsumer consumer = this.consumers.get(id); + if (consumer != null) { + consumer.onMessage(envelope); + } + } + } + + /** + * Sets the transaction context of the session. + * + * @param transactionContext + * provides the means to control a JMS transaction. + */ + public void setTransactionContext(JmsLocalTransactionContext transactionContext) { + this.transactionContext = transactionContext; + } + + /** + * Returns the transaction context of the session. + * + * @return transactionContext + * session's transaction context. + */ + public JmsLocalTransactionContext getTransactionContext() { + return transactionContext; + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java new file mode 100644 index 0000000..cefe491 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.net.URI; + +import org.apache.qpid.jms.provider.Provider; + +/** + * SSL Aware Factory class that allows for configuration of the SSL values used + * in the Provider transports that are SSL aware. + */ +public class JmsSslConnectionFactory extends JmsConnectionFactory { + + private final JmsSslContext configured = JmsSslContext.getCurrentSslContext(); + + public JmsSslConnectionFactory() { + } + + public JmsSslConnectionFactory(String username, String password) { + super(username, password); + } + + public JmsSslConnectionFactory(String brokerURI) { + super(brokerURI); + } + + public JmsSslConnectionFactory(URI brokerURI) { + super(brokerURI); + } + + public JmsSslConnectionFactory(String username, String password, URI brokerURI) { + super(username, password, brokerURI); + } + + public JmsSslConnectionFactory(String username, String password, String brokerURI) { + super(username, password, brokerURI); + } + + @Override + protected Provider createProvider(URI brokerURI) throws Exception { + // Create and set a new instance as the current JmsSslContext for this thread + // based on current configuration settings. + JmsSslContext.setCurrentSslContext(configured.copy()); + return super.createProvider(brokerURI); + } + + public String getKeyStoreLocation() { + return configured.getKeyStoreLocation(); + } + + public void setKeyStoreLocation(String keyStoreLocation) { + this.configured.setKeyStoreLocation(keyStoreLocation); + } + + public String getKeyStorePassword() { + return configured.getKeyStorePassword(); + } + + public void setKeyStorePassword(String keyStorePassword) { + this.configured.setKeyStorePassword(keyStorePassword); + } + + public String getTrustStoreLocation() { + return configured.getTrustStoreLocation(); + } + + public void setTrustStoreLocation(String trustStoreLocation) { + this.configured.setTrustStoreLocation(trustStoreLocation); + } + + public String getTrustStorePassword() { + return configured.getTrustStorePassword(); + } + + public void setTrustStorePassword(String trustStorePassword) { + this.configured.setTrustStorePassword(trustStorePassword); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java new file mode 100644 index 0000000..feca77c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +/** + * Provides a wrapper around the SSL settings that are used by Provider transport + * instances that use an SSL encryption layer. + */ +public class JmsSslContext { + + private String keyStoreLocation; + private String keyStorePassword; + private String trustStoreLocation; + private String trustStorePassword; + + private static final JmsSslContext initial = new JmsSslContext(); + private static final ThreadLocal<JmsSslContext> current; + + static { + + initial.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore")); + initial.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + initial.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore")); + initial.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + + current = new ThreadLocal<JmsSslContext>() { + + @Override + protected JmsSslContext initialValue() { + return initial; + } + }; + } + + protected JmsSslContext() { + } + + public JmsSslContext copy() { + JmsSslContext result = new JmsSslContext(); + result.setKeyStoreLocation(keyStoreLocation); + result.setKeyStorePassword(keyStorePassword); + result.setTrustStoreLocation(trustStoreLocation); + result.setTrustStorePassword(trustStorePassword); + return result; + } + + static public void setCurrentSslContext(JmsSslContext bs) { + current.set(bs); + } + + static public JmsSslContext getCurrentSslContext() { + return current.get(); + } + + public String getKeyStoreLocation() { + return keyStoreLocation; + } + + public void setKeyStoreLocation(String keyStoreLocation) { + this.keyStoreLocation = keyStoreLocation; + } + + public String getKeyStorePassword() { + return keyStorePassword; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + public String getTrustStoreLocation() { + return trustStoreLocation; + } + + public void setTrustStoreLocation(String trustStoreLocation) { + this.trustStoreLocation = trustStoreLocation; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java new file mode 100644 index 0000000..cff489b --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; +import javax.jms.TemporaryQueue; + +/** + * Temporary Queue Object + */ +public class JmsTemporaryQueue extends JmsDestination implements TemporaryQueue { + + public JmsTemporaryQueue() { + this(null); + } + + public JmsTemporaryQueue(String name) { + super(name, false, true); + } + + @Override + public JmsTemporaryQueue copy() { + final JmsTemporaryQueue copy = new JmsTemporaryQueue(); + copy.setProperties(getProperties()); + return copy; + } + + /** + * @see javax.jms.TemporaryQueue#delete() + */ + @Override + public void delete() { + try { + tryDelete(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + /** + * @return name + * @see javax.jms.Queue#getQueueName() + */ + @Override + public String getQueueName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java new file mode 100644 index 0000000..46dfed3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; +import javax.jms.TemporaryTopic; + +/** + * Temporary Topic Object + */ +public class JmsTemporaryTopic extends JmsDestination implements TemporaryTopic { + + public JmsTemporaryTopic() { + super(null, true, true); + } + + public JmsTemporaryTopic(String name) { + super(name, true, true); + } + + @Override + public JmsTemporaryTopic copy() { + final JmsTemporaryTopic copy = new JmsTemporaryTopic(); + copy.setProperties(getProperties()); + return copy; + } + + /** + * @see javax.jms.TemporaryTopic#delete() + */ + @Override + public void delete() { + try { + tryDelete(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + /** + * @return name + * @see javax.jms.Topic#getTopicName() + */ + @Override + public String getTopicName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java new file mode 100644 index 0000000..1840cc7 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Topic; + +/** + * JMS Topic object. + */ +public class JmsTopic extends JmsDestination implements Topic { + + public JmsTopic() { + this(null); + } + + public JmsTopic(String name) { + super(name, true, false); + } + + @Override + public JmsTopic copy() { + final JmsTopic copy = new JmsTopic(); + copy.setProperties(getProperties()); + return copy; + } + + /** + * @return the name + * @see javax.jms.Topic#getTopicName() + */ + @Override + public String getTopicName() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java new file mode 100644 index 0000000..c8fcaba --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.ConnectionConsumer; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; + +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.util.IdGenerator; + +public class JmsTopicConnection extends JmsConnection { + + public JmsTopicConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException { + super(connectionId, provider, clientIdGenerator); + } + + @Override + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection"); + } + + @Override + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { + throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection"); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java new file mode 100644 index 0000000..47d5088 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + +import org.apache.qpid.jms.meta.JmsProducerId; + +/** + * Implementation of a TopicPublisher + */ +public class JmsTopicPublisher extends JmsMessageProducer implements TopicPublisher { + + /** + * Constructor + * + * @param s + * @param destination + */ + protected JmsTopicPublisher(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException { + super(id, session, destination); + } + + /** + * @return the Topic + * @throws IllegalStateException + * @see javax.jms.TopicPublisher#getTopic() + */ + @Override + public Topic getTopic() throws IllegalStateException { + checkClosed(); + return (Topic) this.producerInfo.getDestination(); + } + + /** + * @param message + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Message) + */ + @Override + public void publish(Message message) throws JMSException { + super.send(message); + } + + /** + * @param topic + * @param message + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message) + */ + @Override + public void publish(Topic topic, Message message) throws JMSException { + super.send(topic, message); + } + + /** + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Message, int, int, long) + */ + @Override + public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + super.send(message, deliveryMode, priority, timeToLive); + } + + /** + * @param topic + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message, int, int, long) + */ + @Override + public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + super.send(topic, message, deliveryMode, priority, timeToLive); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java new file mode 100644 index 0000000..ff834aa --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.TemporaryQueue; + +import org.apache.qpid.jms.meta.JmsSessionId; + +/** + * Implementation of a TopicSession + */ +public class JmsTopicSession extends JmsSession { + + protected JmsTopicSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { + super(connection, sessionId, acknowledgementMode); + } + + /** + * @param queue + * @return + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue) + */ + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @param queue + * @param messageSelector + * @return + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String) + */ + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @param destination + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination) + */ + @Override + public MessageConsumer createConsumer(Destination destination) throws JMSException { + if (destination instanceof Queue) { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + return super.createConsumer(destination); + } + + /** + * @param destination + * @param messageSelector + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, + * java.lang.String) + */ + @Override + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + if (destination instanceof Queue) { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + return super.createConsumer(destination, messageSelector); + } + + /** + * @param destination + * @return + * @throws JMSException + * @see javax.jms.Session#createProducer(javax.jms.Destination) + */ + @Override + public MessageProducer createProducer(Destination destination) throws JMSException { + if (destination instanceof Queue) { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + return super.createProducer(destination); + } + + /** + * @param queueName + * @return + * @throws JMSException + * @see javax.jms.Session#createQueue(java.lang.String) + */ + @Override + public Queue createQueue(String queueName) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @return + * @throws JMSException + * @see javax.jms.Session#createTemporaryQueue() + */ + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @param queue + * @return + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue) + */ + @Override + public QueueReceiver createReceiver(Queue queue) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @param queue + * @param messageSelector + * @return + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, + * java.lang.String) + */ + @Override + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } + + /** + * @param queue + * @return + * @throws JMSException + * @see javax.jms.QueueSession#createSender(javax.jms.Queue) + */ + @Override + public QueueSender createSender(Queue queue) throws JMSException { + throw new IllegalStateException("Operation not supported by a TopicSession"); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java new file mode 100644 index 0000000..0ef463a --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.qpid.jms.meta.JmsConsumerId; + +/** + * Implementation of a TopicSubscriber + */ +public class JmsTopicSubscriber extends JmsMessageConsumer implements TopicSubscriber { + + /** + * Creates a non-durable TopicSubscriber + * + * @param id + * @param s + * @param destination + * @param noLocal + * @param selector + * @throws JMSException + */ + JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, boolean noLocal, String selector) throws JMSException { + super(id, s, destination, selector, noLocal); + } + + /** + * Creates a TopicSubscriber that is durable. + * + * @param id + * @param s + * @param destination + * @param name + * @param noLocal + * @param selector + * @throws JMSException + */ + JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException { + super(id, s, destination, name, selector, noLocal); + } + + /** + * @return the Topic + * @throws IllegalStateException + * @see javax.jms.TopicSubscriber#getTopic() + */ + @Override + public Topic getTopic() throws IllegalStateException { + checkClosed(); + return (Topic) this.getDestination(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java new file mode 100644 index 0000000..c0704a1 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +/** + * Allows for a listener to be notified when a transaction is started, commits + * or is rolled back. + */ +public interface JmsTransactionListener { + + void onTransactionStarted(); + + void onTransactionCommitted(); + + void onTransactionRolledBack(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java new file mode 100644 index 0000000..bda7979 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +/** + * Interface for JmsResources that are part of a running transaction to use + * to register for notifications of transaction commit and rollback in order + * to execute specific actions. + * + * One such use of this might be for a consumer to register a synchronization + * when it is closed while it's parent session is still operating inside a + * transaction. The Consumer can close itself following the commit or rollback + * of the running Transaction. + */ +public abstract class JmsTxSynchronization { + + /** + * Called after a successful commit of the current Transaction. + * + * @throws Exception + */ + public void afterCommit() throws Exception { + } + + /** + * Called after the current transaction has been rolled back either + * by a call to rollback or by a failure to complete a commit operation. + * + * @throws Exception + */ + public void afterRollback() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java new file mode 100644 index 0000000..b04b988 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.exceptions; + +public class IdConversionException extends QpidJmsException +{ + private static final long serialVersionUID = -2349723813650476823L; + + public IdConversionException(String reason) + { + super(reason); + } + + public IdConversionException(String reason, Exception cause) + { + super(reason, cause); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java new file mode 100644 index 0000000..e58c54f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.exceptions; + +import java.io.IOException; + +import javax.jms.IllegalStateException; + +/** + * An exception thrown when attempt is made to use a connection when the connection has been closed. + */ +public class JmsConnectionClosedException extends IllegalStateException { + private static final long serialVersionUID = -7975982446284065025L; + + + public JmsConnectionClosedException(IOException cause) { + super("The JMS connection has been closed: " + extractMessage(cause)); + initCause(cause); + setLinkedException(cause); + } + + public JmsConnectionClosedException() { + super("The JMS connection has been closed", "AlreadyClosed"); + } + + private static String extractMessage(IOException cause) { + String m = cause.getMessage(); + if (m == null || m.length() == 0) { + m = cause.toString(); + } + return m; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java new file mode 100644 index 0000000..e9b7068 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.exceptions; + +import java.io.IOException; + +import javax.jms.IllegalStateException; + +/** + * An exception thrown when attempt is made to use a connection when the connection has already failed. + */ +public class JmsConnectionFailedException extends IllegalStateException { + + private static final long serialVersionUID = -3386897790274799220L; + + public JmsConnectionFailedException(IOException cause) { + super("The JMS connection has failed: " + extractMessage(cause)); + initCause(cause); + setLinkedException(cause); + } + + public JmsConnectionFailedException() { + super("The JMS connection has failed due to a Transport problem", "Connection Failed"); + } + + private static String extractMessage(IOException cause) { + String m = cause.getMessage(); + if (m == null || m.length() == 0) { + m = cause.toString(); + } + return m; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java new file mode 100644 index 0000000..81f9ca8 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.exceptions; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; + +/** + * Exception support class. + * + * Factory class for creating JMSException instances based on String messages or by + * wrapping other non-JMS exception. + * + * @since 1.0 + */ +public final class JmsExceptionSupport { + + private JmsExceptionSupport() {} + + public static JMSException create(String msg, Throwable cause) { + JMSException exception = new JMSException(msg); + exception.initCause(cause); + return exception; + } + + public static JMSException create(String msg, Exception cause) { + JMSException exception = new JMSException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static JMSException create(Throwable cause) { + if (cause instanceof JMSException) { + return (JMSException) cause; + } + if (cause.getCause() instanceof JMSException) { + return (JMSException) cause.getCause(); + } + + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + JMSException exception = new JMSException(msg); + exception.initCause(cause); + return exception; + } + + public static JMSException create(Exception cause) { + if (cause instanceof JMSException) { + return (JMSException) cause; + } + if (cause.getCause() instanceof JMSException) { + return (JMSException) cause.getCause(); + } + + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + JMSException exception = new JMSException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static MessageEOFException createMessageEOFException(Exception cause) { + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + MessageEOFException exception = new MessageEOFException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static MessageFormatException createMessageFormatException(Throwable cause) { + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + MessageFormatException exception = new MessageFormatException(msg); + exception.initCause(cause); + return exception; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java new file mode 100644 index 0000000..a922530 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.exceptions; + +import javax.jms.JMSException; + +public class QpidJmsException extends JMSException +{ + private static final long serialVersionUID = 751932967255393054L; + + public QpidJmsException(String reason) + { + this(reason, null); + } + + public QpidJmsException(String reason, Exception cause) + { + super(reason); + if (cause != null) + { + setLinkedException(cause); + initCause(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java new file mode 100644 index 0000000..5d0d04a --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.jndi; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; + +import javax.naming.Context; +import javax.naming.Name; +import javax.naming.NamingException; +import javax.naming.RefAddr; +import javax.naming.Reference; +import javax.naming.StringRefAddr; +import javax.naming.spi.ObjectFactory; + +/** + * Converts objects implementing JNDIStorable into a property fields so they can be + * stored and regenerated from JNDI + * + * @since 1.0 + */ +public class JNDIReferenceFactory implements ObjectFactory { + + /** + * This will be called by a JNDIprovider when a Reference is retrieved from + * a JNDI store - and generates the original instance + * + * @param object + * the Reference object + * @param name + * the JNDI name + * @param nameCtx + * the context + * @param environment + * the environment settings used by JNDI + * + * @return the instance built from the Reference object + * + * @throws Exception + * if building the instance from Reference fails (usually class not + * found) + */ + @Override + public Object getObjectInstance(Object object, Name name, Context nameCtx, Hashtable<?, ?> environment) + throws Exception { + Object result = null; + if (object instanceof Reference) { + Reference reference = (Reference) object; + Class<?> theClass = loadClass(this, reference.getClassName()); + if (JNDIStorable.class.isAssignableFrom(theClass)) { + JNDIStorable store = (JNDIStorable) theClass.newInstance(); + Map<String, String> properties = new HashMap<String, String>(); + for (Enumeration<RefAddr> iter = reference.getAll(); iter.hasMoreElements();) { + StringRefAddr addr = (StringRefAddr) iter.nextElement(); + properties.put(addr.getType(), (addr.getContent() == null) ? "" : addr.getContent().toString()); + } + store.setProperties(properties); + result = store; + } + } else { + throw new RuntimeException("Object " + object + " is not a reference"); + } + return result; + } + + /** + * Create a Reference instance from a JNDIStorable object + * + * @param instanceClassName + * @param po + * @return Reference + * @throws NamingException + */ + public static Reference createReference(String instanceClassName, JNDIStorable po) throws NamingException { + Reference result = new Reference(instanceClassName, JNDIReferenceFactory.class.getName(), null); + try { + Map<String, String> props = po.getProperties(); + for (Map.Entry<String, String> entry : props.entrySet()) { + javax.naming.StringRefAddr addr = new javax.naming.StringRefAddr(entry.getKey(), entry.getValue()); + result.add(addr); + } + } catch (Exception e) { + throw new NamingException(e.getMessage()); + } + return result; + } + + /** + * Retrieve the class loader for a named class + * + * @param thisObj + * @param className + * @return the class + * @throws ClassNotFoundException + */ + public static Class<?> loadClass(Object thisObj, String className) throws ClassNotFoundException { + // try local ClassLoader first. + ClassLoader loader = thisObj.getClass().getClassLoader(); + Class<?> theClass; + if (loader != null) { + theClass = loader.loadClass(className); + } else { + // Will be null in jdk1.1.8 + // use default classLoader + theClass = Class.forName(className); + } + return theClass; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
