http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java new file mode 100644 index 0000000..b4d03af --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -0,0 +1,1128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.net.ssl.SSLContext; + +import org.apache.qpid.jms.exceptions.JmsConnectionFailedException; +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConnectionId; +import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsTransactionId; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderClosedException; +import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.util.IdGenerator; +import org.apache.qpid.jms.util.ThreadPoolUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a JMS Connection + */ +public class JmsConnection implements Connection, TopicConnection, QueueConnection, ProviderListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class); + + private JmsConnectionInfo connectionInfo; + + private final IdGenerator clientIdGenerator; + private boolean clientIdSet; + private boolean sendAcksAsync; + private ExceptionListener exceptionListener; + private final List<JmsSession> sessions = new CopyOnWriteArrayList<JmsSession>(); + private final Map<JmsConsumerId, JmsMessageDispatcher> dispatchers = + new ConcurrentHashMap<JmsConsumerId, JmsMessageDispatcher>(); + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean closing = new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + private final Object connectLock = new Object(); + private IOException firstFailureError; + private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); + private boolean messagePrioritySupported; + + private final ThreadPoolExecutor executor; + + private URI brokerURI; + private URI localURI; + private SSLContext sslContext; + private Provider provider; + private final Set<JmsConnectionListener> connectionListeners = + new CopyOnWriteArraySet<JmsConnectionListener>(); + private final Map<JmsDestination, JmsDestination> tempDestinations = + new ConcurrentHashMap<JmsDestination, JmsDestination>(); + private final AtomicLong sessionIdGenerator = new AtomicLong(); + private final AtomicLong tempDestIdGenerator = new AtomicLong(); + private final AtomicLong transactionIdGenerator = new AtomicLong(); + private JmsMessageFactory messageFactory; + + protected JmsConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException { + + // This executor can be used for dispatching asynchronous tasks that might block or result + // in reentrant calls to this Connection that could block. The thread in this executor + // will also serve as a means of preventing JVM shutdown should a client application + // not have it's own mechanism for doing so. + executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, "QpidJMS Connection Executor: "); + return thread; + } + }); + + this.provider = provider; + this.provider.setProviderListener(this); + try { + this.provider.start(); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + + this.clientIdGenerator = clientIdGenerator; + this.connectionInfo = new JmsConnectionInfo(new JmsConnectionId(connectionId)); + } + + /** + * @throws JMSException + * @see javax.jms.Connection#close() + */ + @Override + public void close() throws JMSException { + boolean interrupted = Thread.interrupted(); + + try { + + if (!closed.get() && !failed.get()) { + // do not fail if already closed as specified by the JMS specification. + doStop(false); + } + + synchronized (this) { + + if (closed.get()) { + return; + } + + closing.set(true); + + for (JmsSession session : this.sessions) { + session.shutdown(); + } + + this.sessions.clear(); + this.tempDestinations.clear(); + + if (isConnected() && !failed.get()) { + ProviderFuture request = new ProviderFuture(); + try { + provider.destroy(connectionInfo, request); + + try { + request.sync(); + } catch (Exception ex) { + // TODO - Spec is a bit vague here, we don't fail if already closed but + // in this case we really aren't closed yet so there could be an + // argument that at this point an exception is still valid. + if (ex.getCause() instanceof InterruptedException) { + throw (InterruptedException) ex.getCause(); + } + LOG.debug("Failed destroying Connection resource: {}", ex.getMessage()); + } + } catch(ProviderClosedException pce) { + LOG.debug("Ignoring provider closed exception during connection close"); + } + } + + connected.set(false); + started.set(false); + closing.set(false); + closed.set(true); + } + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } finally { + try { + ThreadPoolUtils.shutdown(executor); + } catch (Throwable e) { + LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); + } + + if (provider != null) { + provider.close(); + provider = null; + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Called to free all Connection resources. + */ + protected void shutdown() throws JMSException { + + // TODO - Once ConnectionConsumer is added we must shutdown those as well. + + for (JmsSession session : this.sessions) { + session.shutdown(); + } + + if (isConnected() && !failed.get() && !closing.get()) { + destroyResource(connectionInfo); + } + + if (clientIdSet) { + connectionInfo.setClientId(null); + clientIdSet = false; + } + + tempDestinations.clear(); + started.set(false); + connected.set(false); + } + + /** + * @param destination + * @param messageSelector + * @param sessionPool + * @param maxMessages + * @return ConnectionConsumer + * @throws JMSException + * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination, + * java.lang.String, javax.jms.ServerSessionPool, int) + */ + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { + checkClosedOrFailed(); + connect(); + throw new JMSException("Not supported"); + } + + /** + * @param topic + * @param subscriptionName + * @param messageSelector + * @param sessionPool + * @param maxMessages + * @return ConnectionConsumer + * @throws JMSException + * + * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic, + * java.lang.String, java.lang.String, javax.jms.ServerSessionPool, int) + */ + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + checkClosedOrFailed(); + connect(); + throw new JMSException("Not supported"); + } + + /** + * @param transacted + * @param acknowledgeMode + * @return Session + * @throws JMSException + * @see javax.jms.Connection#createSession(boolean, int) + */ + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + checkClosedOrFailed(); + connect(); + int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); + JmsSession result = new JmsSession(this, getNextSessionId(), ackMode); + addSession(result); + if (started.get()) { + result.start(); + } + return result; + } + + /** + * @return clientId + * @see javax.jms.Connection#getClientID() + */ + @Override + public String getClientID() throws JMSException { + checkClosedOrFailed(); + return this.connectionInfo.getClientId(); + } + + /** + * @return connectionInfoData + * @see javax.jms.Connection#getMetaData() + */ + @Override + public ConnectionMetaData getMetaData() throws JMSException { + checkClosedOrFailed(); + return JmsConnectionMetaData.INSTANCE; + } + + /** + * @param clientID + * @throws JMSException + * @see javax.jms.Connection#setClientID(java.lang.String) + */ + @Override + public synchronized void setClientID(String clientID) throws JMSException { + checkClosedOrFailed(); + + if (this.clientIdSet) { + throw new IllegalStateException("The clientID has already been set"); + } + if (clientID == null) { + throw new IllegalStateException("Cannot have a null clientID"); + } + if (connected.get()) { + throw new IllegalStateException("Cannot set the client id once connected."); + } + + this.connectionInfo.setClientId(clientID); + this.clientIdSet = true; + + //We weren't connected if we got this far, we should now connect now to ensure the clientID is valid. + //TODO: determine if any resulting failure is only the result of the ClientID value, or other reasons such as auth. + connect(); + } + + /** + * @throws JMSException + * @see javax.jms.Connection#start() + */ + @Override + public void start() throws JMSException { + checkClosedOrFailed(); + connect(); + if (this.started.compareAndSet(false, true)) { + try { + for (JmsSession s : this.sessions) { + s.start(); + } + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + } + + /** + * @throws JMSException + * @see javax.jms.Connection#stop() + */ + @Override + public void stop() throws JMSException { + doStop(true); + } + + /** + * @see #stop() + * @param checkClosed <tt>true</tt> to check for already closed and throw + * {@link java.lang.IllegalStateException} if already closed, + * <tt>false</tt> to skip this check + * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error. + */ + void doStop(boolean checkClosed) throws JMSException { + if (checkClosed) { + checkClosedOrFailed(); + } + if (started.compareAndSet(true, false)) { + synchronized(sessions) { + for (JmsSession s : this.sessions) { + s.stop(); + } + } + } + } + + /** + * @param topic + * @param messageSelector + * @param sessionPool + * @param maxMessages + * @return ConnectionConsumer + * @throws JMSException + * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic, + * java.lang.String, javax.jms.ServerSessionPool, int) + */ + @Override + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { + checkClosedOrFailed(); + connect(); + return null; + } + + /** + * @param transacted + * @param acknowledgeMode + * @return TopicSession + * @throws JMSException + * @see javax.jms.TopicConnection#createTopicSession(boolean, int) + */ + @Override + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { + checkClosedOrFailed(); + connect(); + int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); + JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode); + addSession(result); + if (started.get()) { + result.start(); + } + return result; + } + + /** + * @param queue + * @param messageSelector + * @param sessionPool + * @param maxMessages + * @return ConnectionConsumer + * @throws JMSException + * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue, + * java.lang.String, javax.jms.ServerSessionPool, int) + */ + @Override + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { + checkClosedOrFailed(); + connect(); + return null; + } + + /** + * @param transacted + * @param acknowledgeMode + * @return QueueSession + * @throws JMSException + * @see javax.jms.QueueConnection#createQueueSession(boolean, int) + */ + @Override + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { + checkClosedOrFailed(); + connect(); + int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); + JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode); + addSession(result); + if (started.get()) { + result.start(); + } + return result; + } + + /** + * @param ex + */ + public void onException(Exception ex) { + onException(JmsExceptionSupport.create(ex)); + } + + /** + * @param ex + */ + public void onException(JMSException ex) { + ExceptionListener l = this.exceptionListener; + if (l != null) { + l.onException(JmsExceptionSupport.create(ex)); + } + } + + protected int getSessionAcknowledgeMode(boolean transacted, int acknowledgeMode) throws JMSException { + int result = acknowledgeMode; + if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) { + throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); + } + if (transacted) { + result = Session.SESSION_TRANSACTED; + } + return result; + } + + protected void removeSession(JmsSession session) throws JMSException { + this.sessions.remove(session); + } + + protected void addSession(JmsSession s) { + this.sessions.add(s); + } + + protected void addDispatcher(JmsConsumerId consumerId, JmsMessageDispatcher dispatcher) { + dispatchers.put(consumerId, dispatcher); + } + + protected void removeDispatcher(JmsConsumerId consumerId) { + dispatchers.remove(consumerId); + } + + private void connect() throws JMSException { + synchronized(this.connectLock) { + if (isConnected() || closed.get()) { + return; + } + + if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) { + connectionInfo.setClientId(clientIdGenerator.generateId()); + } + + this.connectionInfo = createResource(connectionInfo); + this.connected.set(true); + this.messageFactory = provider.getMessageFactory(); + + // TODO - Advisory Support. + // + // Providers should have an interface for adding a listener for temporary + // destination advisory messages for create / destroy so we can track them + // and throw exceptions when producers try to send to deleted destinations. + } + } + + /** + * @return a newly initialized TemporaryQueue instance. + */ + protected TemporaryQueue createTemporaryQueue() throws JMSException { + String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet(); + JmsTemporaryQueue queue = new JmsTemporaryQueue(destinationName); + queue = createResource(queue); + tempDestinations.put(queue, queue); + return queue; + } + + /** + * @return a newly initialized TemporaryTopic instance. + */ + protected TemporaryTopic createTemporaryTopic() throws JMSException { + String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet(); + JmsTemporaryTopic topic = new JmsTemporaryTopic(destinationName); + topic = createResource(topic); + tempDestinations.put(topic, topic); + return topic; + } + + protected void deleteDestination(JmsDestination destination) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + + for (JmsSession session : this.sessions) { + if (session.isDestinationInUse(destination)) { + throw new JMSException("A consumer is consuming from the temporary destination"); + } + } + + if (destination.isTemporary()) { + tempDestinations.remove(destination); + } + + destroyResource(destination); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + protected void checkClosedOrFailed() throws JMSException { + checkClosed(); + if (failed.get()) { + throw new JmsConnectionFailedException(firstFailureError); + } + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed.get()) { + throw new IllegalStateException("The Connection is closed"); + } + } + + protected JmsSessionId getNextSessionId() { + return new JmsSessionId(connectionInfo.getConnectionId(), sessionIdGenerator.incrementAndGet()); + } + + protected JmsTransactionId getNextTransactionId() { + return new JmsTransactionId(connectionInfo.getConnectionId(), transactionIdGenerator.incrementAndGet()); + } + + //////////////////////////////////////////////////////////////////////////// + // Provider interface methods + //////////////////////////////////////////////////////////////////////////// + + <T extends JmsResource> T createResource(T resource) throws JMSException { + checkClosedOrFailed(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.create(resource, request); + request.sync(); + return resource; + } catch (Exception ex) { + throw JmsExceptionSupport.create(ex); + } + } + + void startResource(JmsResource resource) throws JMSException { + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.start(resource, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void destroyResource(JmsResource resource) throws JMSException { + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.destroy(resource, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void send(JmsOutboundMessageDispatch envelope) throws JMSException { + checkClosedOrFailed(); + connect(); + + // TODO - We don't currently have a way to say that an operation + // should be done asynchronously. A send can be done async + // in many cases, such as non-persistent delivery. We probably + // don't need to do anything here though just have a way to + // configure the provider for async sends which we do in the + // JmsConnectionInfo. Here we just need to register a listener + // on the request to know when it completes if we want to do + // JMS 2.0 style async sends where we signal a callback, then + // we can manage order of callback events to async senders at + // this level. + try { + ProviderFuture request = new ProviderFuture(); + provider.send(envelope, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.acknowledge(envelope, ackType, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void acknowledge(JmsSessionId sessionId) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.acknowledge(sessionId, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void unsubscribe(String name) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.unsubscribe(name, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void commit(JmsSessionId sessionId) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.commit(sessionId, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void rollback(JmsSessionId sessionId) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.rollback(sessionId, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void recover(JmsSessionId sessionId) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.recover(sessionId, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + void pull(JmsConsumerId consumerId, long timeout) throws JMSException { + checkClosedOrFailed(); + connect(); + + try { + ProviderFuture request = new ProviderFuture(); + provider.pull(consumerId, timeout, request); + request.sync(); + } catch (Exception ioe) { + throw JmsExceptionSupport.create(ioe); + } + } + + //////////////////////////////////////////////////////////////////////////// + // Property setters and getters + //////////////////////////////////////////////////////////////////////////// + + /** + * @return ExceptionListener + * @see javax.jms.Connection#getExceptionListener() + */ + @Override + public ExceptionListener getExceptionListener() throws JMSException { + checkClosedOrFailed(); + return this.exceptionListener; + } + + /** + * @param listener + * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener) + */ + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + checkClosedOrFailed(); + this.exceptionListener = listener; + } + + /** + * Adds a JmsConnectionListener so that a client can be notified of events in + * the underlying protocol provider. + * + * @param listener + * the new listener to add to the collection. + */ + public void addConnectionListener(JmsConnectionListener listener) { + this.connectionListeners.add(listener); + } + + /** + * Removes a JmsConnectionListener that was previously registered. + * + * @param listener + * the listener to remove from the collection. + */ + public void removeTransportListener(JmsConnectionListener listener) { + this.connectionListeners.remove(listener); + } + + public boolean isForceAsyncSend() { + return connectionInfo.isForceAsyncSend(); + } + + public void setForceAsyncSend(boolean forceAsyncSend) { + connectionInfo.setForceAsyncSends(forceAsyncSend); + } + + public boolean isAlwaysSyncSend() { + return connectionInfo.isAlwaysSyncSend(); + } + + public void setAlwaysSyncSend(boolean alwaysSyncSend) { + this.connectionInfo.setAlwaysSyncSend(alwaysSyncSend); + } + + public String getTopicPrefix() { + return connectionInfo.getTopicPrefix(); + } + + public void setTopicPrefix(String topicPrefix) { + connectionInfo.setTopicPrefix(topicPrefix); + } + + public String getTempTopicPrefix() { + return connectionInfo.getTempTopicPrefix(); + } + + public void setTempTopicPrefix(String tempTopicPrefix) { + connectionInfo.setTempTopicPrefix(tempTopicPrefix); + } + + public String getTempQueuePrefix() { + return connectionInfo.getTempQueuePrefix(); + } + + public void setTempQueuePrefix(String tempQueuePrefix) { + connectionInfo.setTempQueuePrefix(tempQueuePrefix); + } + + public String getQueuePrefix() { + return connectionInfo.getQueuePrefix(); + } + + public void setQueuePrefix(String queuePrefix) { + connectionInfo.setQueuePrefix(queuePrefix); + } + + public boolean isOmitHost() { + return connectionInfo.isOmitHost(); + } + + public void setOmitHost(boolean omitHost) { + connectionInfo.setOmitHost(omitHost); + } + + public JmsPrefetchPolicy getPrefetchPolicy() { + return prefetchPolicy; + } + + public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { + this.prefetchPolicy = prefetchPolicy; + } + + public boolean isMessagePrioritySupported() { + return messagePrioritySupported; + } + + public void setMessagePrioritySupported(boolean messagePrioritySupported) { + this.messagePrioritySupported = messagePrioritySupported; + } + + public long getCloseTimeout() { + return connectionInfo.getCloseTimeout(); + } + + public void setCloseTimeout(long closeTimeout) { + connectionInfo.setCloseTimeout(closeTimeout); + } + + public long getConnectTimeout() { + return this.connectionInfo.getConnectTimeout(); + } + + public void setConnectTimeout(long connectTimeout) { + this.connectionInfo.setConnectTimeout(connectTimeout); + } + + public long getSendTimeout() { + return connectionInfo.getSendTimeout(); + } + + public void setSendTimeout(long sendTimeout) { + connectionInfo.setSendTimeout(sendTimeout); + } + + public long getRequestTimeout() { + return connectionInfo.getRequestTimeout(); + } + + public void setRequestTimeout(long requestTimeout) { + connectionInfo.setRequestTimeout(requestTimeout); + } + + public URI getBrokerURI() { + return brokerURI; + } + + public void setBrokerURI(URI brokerURI) { + this.brokerURI = brokerURI; + } + + public URI getLocalURI() { + return localURI; + } + + public void setLocalURI(URI localURI) { + this.localURI = localURI; + } + + public SSLContext getSslContext() { + return sslContext; + } + + public void setSslContext(SSLContext sslContext) { + this.sslContext = sslContext; + } + + public String getUsername() { + return this.connectionInfo.getUsername(); + } + + public void setUsername(String username) { + this.connectionInfo.setUsername(username);; + } + + public String getPassword() { + return this.connectionInfo.getPassword(); + } + + public void setPassword(String password) { + this.connectionInfo.setPassword(password); + } + + public Provider getProvider() { + return provider; + } + + void setProvider(Provider provider) { + this.provider = provider; + } + + public boolean isConnected() { + return this.connected.get(); + } + + public boolean isStarted() { + return this.started.get(); + } + + public boolean isClosed() { + return this.closed.get(); + } + + JmsConnectionId getConnectionId() { + return this.connectionInfo.getConnectionId(); + } + + public boolean isWatchRemoteDestinations() { + return this.connectionInfo.isWatchRemoteDestinations(); + } + + public void setWatchRemoteDestinations(boolean watchRemoteDestinations) { + this.connectionInfo.setWatchRemoteDestinations(watchRemoteDestinations); + } + + public JmsMessageFactory getMessageFactory() { + return messageFactory; + } + + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + + JmsMessage incoming = envelope.getMessage(); + // Ensure incoming Messages are in readonly mode. + if (incoming != null) { + incoming.setReadOnlyBody(true); + incoming.setReadOnlyProperties(true); + } + + JmsMessageDispatcher dispatcher = dispatchers.get(envelope.getConsumerId()); + if (dispatcher != null) { + dispatcher.onMessage(envelope); + } + for (JmsConnectionListener listener : connectionListeners) { + listener.onMessage(envelope); + } + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + for (JmsSession session : sessions) { + session.onConnectionInterrupted(); + } + + for (JmsConnectionListener listener : connectionListeners) { + listener.onConnectionInterrupted(remoteURI); + } + } + + @Override + public void onConnectionRecovery(Provider provider) throws Exception { + // TODO - Recover Advisory Consumer once we can support it. + + LOG.debug("Connection {} is starting recovery.", connectionInfo.getConnectionId()); + + ProviderFuture request = new ProviderFuture(); + provider.create(connectionInfo, request); + request.sync(); + + for (JmsDestination tempDestination : tempDestinations.values()) { + createResource(tempDestination); + } + + for (JmsSession session : sessions) { + session.onConnectionRecovery(provider); + } + } + + @Override + public void onConnectionRecovered(Provider provider) throws Exception { + LOG.debug("Connection {} is finalizing recovery.", connectionInfo.getConnectionId()); + + this.messageFactory = provider.getMessageFactory(); + + for (JmsSession session : sessions) { + session.onConnectionRecovered(provider); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + for (JmsSession session : sessions) { + session.onConnectionRestored(); + } + + for (JmsConnectionListener listener : connectionListeners) { + listener.onConnectionRestored(remoteURI); + } + } + + @Override + public void onConnectionFailure(final IOException ex) { + onAsyncException(ex); + if (!closing.get() && !closed.get()) { + executor.execute(new Runnable() { + @Override + public void run() { + providerFailed(ex); + if (provider != null) { + try { + provider.close(); + } catch (Throwable error) { + LOG.debug("Error while closing failed Provider: {}", error.getMessage()); + } + } + + try { + shutdown(); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); + } + + for (JmsConnectionListener listener : connectionListeners) { + listener.onConnectionFailure(ex); + } + } + }); + } + } + + /** + * Handles any asynchronous errors that occur from the JMS framework classes. + * + * If any listeners are registered they will be notified of the error from a thread + * in the Connection's Executor service. + * + * @param error + * The exception that triggered this error. + */ + public void onAsyncException(Throwable error) { + if (!closed.get() && !closing.get()) { + if (this.exceptionListener != null) { + + if (!(error instanceof JMSException)) { + error = JmsExceptionSupport.create(error); + } + final JMSException jmsError = (JMSException)error; + + executor.execute(new Runnable() { + @Override + public void run() { + JmsConnection.this.exceptionListener.onException(jmsError); + } + }); + } else { + LOG.debug("Async exception with no exception listener: " + error, error); + } + } + } + + protected void providerFailed(IOException error) { + failed.set(true); + if (firstFailureError == null) { + firstFailureError = error; + } + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java new file mode 100644 index 0000000..1333a5e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java @@ -0,0 +1,664 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.jndi.JNDIStorable; +import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.util.IdGenerator; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.jms.util.URISupport; +import org.apache.qpid.jms.util.URISupport.CompositeData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JMS ConnectionFactory Implementation. + */ +public class JmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionFactory.class); + + private URI brokerURI; + private URI localURI; + private String username; + private String password; + private boolean forceAsyncSend; + private boolean alwaysSyncSend; + private boolean sendAcksAsync; + private boolean omitHost; + private boolean messagePrioritySupported = true; + private String queuePrefix = "queue://"; + private String topicPrefix = "topic://"; + private String tempQueuePrefix = "temp-queue://"; + private String tempTopicPrefix = "temp-topic://"; + private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; + private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; + private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; + private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT; + private boolean watchRemoteDestinations = true; + private IdGenerator clientIdGenerator; + private String clientIDPrefix; + private IdGenerator connectionIdGenerator; + private String connectionIDPrefix; + private ExceptionListener exceptionListener; + + private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); + + public JmsConnectionFactory() { + } + + public JmsConnectionFactory(String username, String password) { + setUsername(username); + setPassword(password); + } + + public JmsConnectionFactory(String brokerURI) { + this(createURI(brokerURI)); + } + + public JmsConnectionFactory(URI brokerURI) { + setBrokerURI(brokerURI.toString()); + } + + public JmsConnectionFactory(String userName, String password, URI brokerURI) { + setUsername(userName); + setPassword(password); + setBrokerURI(brokerURI.toString()); + } + + public JmsConnectionFactory(String userName, String password, String brokerURI) { + setUsername(userName); + setPassword(password); + setBrokerURI(brokerURI); + } + + /** + * Set properties + * + * @param props + */ + public void setProperties(Properties props) { + Map<String, String> map = new HashMap<String, String>(); + for (Map.Entry<Object, Object> entry : props.entrySet()) { + map.put(entry.getKey().toString(), entry.getValue().toString()); + } + setProperties(map); + } + + @Override + public void setProperties(Map<String, String> map) { + buildFromProperties(map); + } + + /** + * @param map + */ + @Override + protected void buildFromProperties(Map<String, String> map) { + PropertyUtil.setProperties(this, map); + } + + /** + * @param map + */ + @Override + protected void populateProperties(Map<String, String> map) { + try { + Map<String, String> result = PropertyUtil.getProperties(this); + map.putAll(result); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * @return a TopicConnection + * @throws JMSException + * @see javax.jms.TopicConnectionFactory#createTopicConnection() + */ + @Override + public TopicConnection createTopicConnection() throws JMSException { + return createTopicConnection(getUsername(), getPassword()); + } + + /** + * @param userName + * @param password + * @return a TopicConnection + * @throws JMSException + * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String, + * java.lang.String) + */ + @Override + public TopicConnection createTopicConnection(String username, String password) throws JMSException { + try { + String connectionId = getConnectionIdGenerator().generateId(); + Provider provider = createProvider(brokerURI); + JmsTopicConnection result = new JmsTopicConnection(connectionId, provider, getClientIdGenerator()); + return configureConnection(result, username, password); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + /** + * @return a Connection + * @throws JMSException + * @see javax.jms.ConnectionFactory#createConnection() + */ + @Override + public Connection createConnection() throws JMSException { + return createConnection(getUsername(), getPassword()); + } + + /** + * @param userName + * @param password + * @return Connection + * @throws JMSException + * @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String) + */ + @Override + public Connection createConnection(String username, String password) throws JMSException { + try { + String connectionId = getConnectionIdGenerator().generateId(); + Provider provider = createProvider(brokerURI); + JmsConnection result = new JmsConnection(connectionId, provider, getClientIdGenerator()); + return configureConnection(result, username, password); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + /** + * @return a QueueConnection + * @throws JMSException + * @see javax.jms.QueueConnectionFactory#createQueueConnection() + */ + @Override + public QueueConnection createQueueConnection() throws JMSException { + return createQueueConnection(getUsername(), getPassword()); + } + + /** + * @param userName + * @param password + * @return a QueueConnection + * @throws JMSException + * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String, + * java.lang.String) + */ + @Override + public QueueConnection createQueueConnection(String username, String password) throws JMSException { + try { + String connectionId = getConnectionIdGenerator().generateId(); + Provider provider = createProvider(brokerURI); + JmsQueueConnection result = new JmsQueueConnection(connectionId, provider, getClientIdGenerator()); + return configureConnection(result, username, password); + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + protected <T extends JmsConnection> T configureConnection(T connection, String username, String password) throws JMSException { + try { + PropertyUtil.setProperties(connection, PropertyUtil.getProperties(this)); + connection.setExceptionListener(exceptionListener); + connection.setUsername(username); + connection.setPassword(password); + connection.setBrokerURI(brokerURI); + return connection; + } catch (Exception e) { + throw JmsExceptionSupport.create(e); + } + } + + protected Provider createProvider(URI brokerURI) throws Exception { + Provider result = null; + + try { + result = ProviderFactory.createAsync(brokerURI); + } catch (Exception ex) { + LOG.error("Failed to create JMS Provider instance for: {}", brokerURI.getScheme()); + LOG.trace("Error: ", ex); + throw ex; + } + + return result; + } + + protected static URI createURI(String name) { + if (name != null && name.trim().isEmpty() == false) { + try { + return new URI(name); + } catch (URISyntaxException e) { + throw (IllegalArgumentException) new IllegalArgumentException("Invalid broker URI: " + name).initCause(e); + } + } + return null; + } + + protected synchronized IdGenerator getConnectionIdGenerator() { + if (connectionIdGenerator == null) { + if (connectionIDPrefix != null) { + connectionIdGenerator = new IdGenerator(connectionIDPrefix); + } else { + connectionIdGenerator = new IdGenerator(); + } + } + return connectionIdGenerator; + } + + protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { + this.connectionIdGenerator = connectionIdGenerator; + } + + ////////////////////////////////////////////////////////////////////////// + // Property getters and setters + ////////////////////////////////////////////////////////////////////////// + + /** + * @return the brokerURI + */ + public String getBrokerURI() { + return this.brokerURI != null ? this.brokerURI.toString() : ""; + } + + /** + * @param brokerURI + * the brokerURI to set + */ + public void setBrokerURI(String brokerURI) { + if (brokerURI == null) { + throw new IllegalArgumentException("brokerURI cannot be null"); + } + this.brokerURI = createURI(brokerURI); + + try { + if (this.brokerURI.getQuery() != null) { + Map<String, String> map = PropertyUtil.parseQuery(this.brokerURI.getQuery()); + Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms."); + + if (!PropertyUtil.setProperties(this, jmsOptionsMap)) { + String msg = "" + + " Not all jms options could be set on the ConnectionFactory." + + " Check the options are spelled correctly." + + " Given parameters=[" + jmsOptionsMap + "]." + + " This connection factory cannot be started."; + throw new IllegalArgumentException(msg); + } else { + this.brokerURI = PropertyUtil.replaceQuery(this.brokerURI, map); + } + } else if (URISupport.isCompositeURI(this.brokerURI)) { + CompositeData data = URISupport.parseComposite(this.brokerURI); + Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms."); + if (!PropertyUtil.setProperties(this, jmsOptionsMap)) { + String msg = "" + + " Not all jms options could be set on the ConnectionFactory." + + " Check the options are spelled correctly." + + " Given parameters=[" + jmsOptionsMap + "]." + + " This connection factory cannot be started."; + throw new IllegalArgumentException(msg); + } else { + this.brokerURI = data.toURI(); + } + } + } catch (Exception e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + /** + * @return the localURI + */ + public String getLocalURI() { + return this.localURI != null ? this.localURI.toString() : ""; + } + + /** + * @param localURI + * the localURI to set + */ + public void setLocalURI(String localURI) { + this.localURI = createURI(localURI); + } + + /** + * @return the username + */ + public String getUsername() { + return this.username; + } + + /** + * @param username + * the username to set + */ + public void setUsername(String username) { + this.username = username; + } + + /** + * @return the password + */ + public String getPassword() { + return this.password; + } + + /** + * @param password + * the password to set + */ + public void setPassword(String password) { + this.password = password; + } + + public boolean isForceAsyncSend() { + return forceAsyncSend; + } + + public void setForceAsyncSend(boolean forceAsyncSend) { + this.forceAsyncSend = forceAsyncSend; + } + + public boolean isOmitHost() { + return omitHost; + } + + public void setOmitHost(boolean omitHost) { + this.omitHost = omitHost; + } + + /** + * @return the messagePrioritySupported configuration option. + */ + public boolean isMessagePrioritySupported() { + return this.messagePrioritySupported; + } + + /** + * Enables message priority support in MessageConsumer instances. This results + * in all prefetched messages being dispatched in priority order. + * + * @param messagePrioritySupported the messagePrioritySupported to set + */ + public void setMessagePrioritySupported(boolean messagePrioritySupported) { + this.messagePrioritySupported = messagePrioritySupported; + } + + /** + * Returns the prefix applied to Queues that are created by the client. + * + * @return the currently configured Queue prefix. + */ + public String getQueuePrefix() { + return queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) { + this.queuePrefix = queuePrefix; + } + + /** + * Returns the prefix applied to Temporary Queues that are created by the client. + * + * @return the currently configured Temporary Queue prefix. + */ + public String getTempQueuePrefix() { + return tempQueuePrefix; + } + + public void setTempQueuePrefix(String tempQueuePrefix) { + this.tempQueuePrefix = tempQueuePrefix; + } + + /** + * Returns the prefix applied to Temporary Topics that are created by the client. + * + * @return the currently configured Temporary Topic prefix. + */ + public String getTempTopicPrefix() { + return tempTopicPrefix; + } + + public void setTempTopicPrefix(String tempTopicPrefix) { + this.tempTopicPrefix = tempTopicPrefix; + } + + /** + * Returns the prefix applied to Topics that are created by the client. + * + * @return the currently configured Topic prefix. + */ + public String getTopicPrefix() { + return topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) { + this.topicPrefix = topicPrefix; + } + + /** + * Gets the currently set close timeout. + * + * @return the currently set close timeout. + */ + public long getCloseTimeout() { + return closeTimeout; + } + + /** + * Sets the close timeout used to control how long a Connection close will wait for + * clean shutdown of the connection before giving up. A negative value means wait + * forever. + * + * Care should be taken in that a very short close timeout can cause the client to + * not cleanly shutdown the connection and it's resources. + * + * @param closeTimeout + * time in milliseconds to wait for a clean connection close. + */ + public void setCloseTimeout(long closeTimeout) { + this.closeTimeout = closeTimeout; + } + + /** + * Returns the currently configured wire level connect timeout. + * + * @return the currently configured wire level connect timeout. + */ + public long getConnectTimeout() { + return this.connectTimeout; + } + + /** + * Sets the timeout value used to control how long a client will wait for a successful + * connection to the remote peer to be established before considering the attempt to + * have failed. This value does not control socket level connection timeout but rather + * connection handshake at the wire level, to control the socket level timeouts use the + * standard socket options configuration values. + * + * @param connectTimeout + * the time in milliseconds to wait for the protocol connection handshake to complete. + */ + public void setConnectTimeout(long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public long getSendTimeout() { + return sendTimeout; + } + + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } + + public long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public JmsPrefetchPolicy getPrefetchPolicy() { + return prefetchPolicy; + } + + public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { + this.prefetchPolicy = prefetchPolicy; + } + + public String getClientIDPrefix() { + return clientIDPrefix; + } + + /** + * Sets the prefix used by auto-generated JMS Client ID values which are used if the JMS + * client does not explicitly specify on. + * + * @param clientIDPrefix + */ + public void setClientIDPrefix(String clientIDPrefix) { + this.clientIDPrefix = clientIDPrefix; + } + + protected synchronized IdGenerator getClientIdGenerator() { + if (clientIdGenerator == null) { + if (clientIDPrefix != null) { + clientIdGenerator = new IdGenerator(clientIDPrefix); + } else { + clientIdGenerator = new IdGenerator(); + } + } + return clientIdGenerator; + } + + protected void setClientIdGenerator(IdGenerator clientIdGenerator) { + this.clientIdGenerator = clientIdGenerator; + } + + /** + * Sets the prefix used by connection id generator. + * + * @param connectionIDPrefix + * The string prefix used on all connection Id's created by this factory. + */ + public void setConnectionIDPrefix(String connectionIDPrefix) { + this.connectionIDPrefix = connectionIDPrefix; + } + + /** + * Gets the currently configured JMS ExceptionListener that will be set on all + * new Connection objects created from this factory. + * + * @return the currently configured JMS ExceptionListener. + */ + public ExceptionListener getExceptionListener() { + return exceptionListener; + } + + /** + * Sets the JMS ExceptionListener that will be set on all new Connection objects + * created from this factory. + * + * @param exceptionListener + * the JMS ExceptionListenenr to apply to new Connection's or null to clear. + */ + public void setExceptionListener(ExceptionListener exceptionListener) { + this.exceptionListener = exceptionListener; + } + + /** + * Indicates if the Connection's created from this factory will watch for updates + * from the remote peer informing of temporary destination creation and destruction. + * + * @return true if destination monitoring is enabled. + */ + public boolean isWatchRemoteDestinations() { + return watchRemoteDestinations; + } + + /** + * Enable or disable monitoring of remote temporary destination life-cycles. + * + * @param watchRemoteDestinations + * true if connection instances should monitor remote destination life-cycles. + */ + public void setWatchRemoteDestinations(boolean watchRemoteDestinations) { + this.watchRemoteDestinations = watchRemoteDestinations; + } + + /** + * Returns true if the client should always send messages using a synchronous + * send operation regardless of persistence mode, or inside a transaction. + * + * @return true if sends should always be done synchronously. + */ + public boolean isAlwaysSyncSend() { + return alwaysSyncSend; + } + + /** + * Configures whether or not the client will always send messages synchronously or not + * regardless of other factors that might result in an asynchronous send. + * + * @param alwaysSyncSend + * if true sends are always done synchronously. + */ + public void setAlwaysSyncSend(boolean alwaysSyncSend) { + this.alwaysSyncSend = alwaysSyncSend; + } + + /** + * @return true if consumer acknowledgments are sent asynchronously or not. + */ + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + /** + * Should the message acknowledgments from a consumer be sent synchronously or + * asynchronously. Sending the acknowledgments asynchronously can increase the + * performance of a consumer but opens up the possibility of a missed message + * acknowledge should the connection be unstable. + * + * @param sendAcksAsync + * true to have the client send all message acknowledgments asynchronously. + */ + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java new file mode 100644 index 0000000..2439760 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.net.URI; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Providers an interface for client's to listener to events related to + * an JmsConnection. + */ +public interface JmsConnectionListener { + + /** + * Called when an unrecoverable error occurs and the Connection must be closed. + * + * @param error + * The error that triggered the failure. + */ + void onConnectionFailure(Throwable error); + + /** + * Called when the Connection to the remote peer is lost. + * + * @param remoteURI + * The URI of the Broker previously connected to. + */ + void onConnectionInterrupted(URI remoteURI); + + /** + * Called when normal communication has been restored to a remote peer. + * + * @param remoteURI + * The URI of the Broker that this client is now connected to. + */ + void onConnectionRestored(URI remoteURI); + + /** + * Called when a Connection is notified that a new Message has arrived for + * one of it's currently active subscriptions. + * + * @param envelope + * The envelope that contains the incoming message and it's delivery information. + */ + void onMessage(JmsInboundMessageDispatch envelope); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java new file mode 100644 index 0000000..f674320 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Enumeration; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.jms.ConnectionMetaData; + +/** + * A <CODE>ConnectionMetaData</CODE> object provides information describing + * the <CODE>Connection</CODE> object. + */ +public final class JmsConnectionMetaData implements ConnectionMetaData { + + public static final String PROVIDER_VERSION; + public static final int PROVIDER_MAJOR_VERSION; + public static final int PROVIDER_MINOR_VERSION; + + public static final JmsConnectionMetaData INSTANCE = new JmsConnectionMetaData(); + + static { + String version = null; + int major = 0; + int minor = 0; + try { + Package p = Package.getPackage("org.apache.qpid.jms"); + if (p != null) { + version = p.getImplementationVersion(); + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + Matcher m = pattern.matcher(version); + if (m.matches()) { + major = Integer.parseInt(m.group(1)); + minor = Integer.parseInt(m.group(2)); + } + } + } catch (Throwable e) { + InputStream in = null; + if ((in = JmsConnectionMetaData.class.getResourceAsStream("/org/apache/qpid/jms/version.txt")) != null) { + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + version = reader.readLine(); + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + Matcher m = pattern.matcher(version); + if (m.matches()) { + major = Integer.parseInt(m.group(1)); + minor = Integer.parseInt(m.group(2)); + } + reader.close(); + } catch(Throwable err) { + } + } + } + PROVIDER_VERSION = version; + PROVIDER_MAJOR_VERSION = major; + PROVIDER_MINOR_VERSION = minor; + } + + private JmsConnectionMetaData() {} + + /** + * Gets the JMS API version. + * + * @return the JMS API version + */ + @Override + public String getJMSVersion() { + return "1.1"; + } + + /** + * Gets the JMS major version number. + * + * @return the JMS API major version number + */ + @Override + public int getJMSMajorVersion() { + return 1; + } + + /** + * Gets the JMS minor version number. + * + * @return the JMS API minor version number + */ + @Override + public int getJMSMinorVersion() { + return 1; + } + + /** + * Gets the JMS provider name. + * + * @return the JMS provider name + */ + @Override + public String getJMSProviderName() { + return "QpidJMS"; + } + + /** + * Gets the JMS provider version. + * + * @return the JMS provider version + */ + @Override + public String getProviderVersion() { + return PROVIDER_VERSION; + } + + /** + * Gets the JMS provider major version number. + * + * @return the JMS provider major version number + */ + @Override + public int getProviderMajorVersion() { + return PROVIDER_MAJOR_VERSION; + } + + /** + * Gets the JMS provider minor version number. + * + * @return the JMS provider minor version number + */ + @Override + public int getProviderMinorVersion() { + return PROVIDER_MINOR_VERSION; + } + + /** + * Gets an enumeration of the JMSX property names. + * + * @return an Enumeration of JMSX property names + */ + @Override + public Enumeration<String> getJMSXPropertyNames() { + Vector<String> jmxProperties = new Vector<String>(); + jmxProperties.add("JMSXUserID"); + jmxProperties.add("JMSXGroupID"); + jmxProperties.add("JMSXGroupSeq"); + jmxProperties.add("JMSXDeliveryCount"); + return jmxProperties.elements(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java new file mode 100644 index 0000000..a86e0f7 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.jndi.JNDIStorable; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsResourceVistor; + +/** + * Jms Destination + */ +public abstract class JmsDestination extends JNDIStorable implements JmsResource, Externalizable, javax.jms.Destination, Comparable<JmsDestination> { + + protected transient String name; + protected transient boolean topic; + protected transient boolean temporary; + protected transient int hashValue; + protected transient JmsConnection connection; + + protected JmsDestination(String name, boolean topic, boolean temporary) { + this.name = name; + this.topic = topic; + this.temporary = temporary; + } + + public abstract JmsDestination copy(); + + @Override + public String toString() { + return name; + } + + /** + * @return name of destination + */ + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + /** + * @return the topic + */ + public boolean isTopic() { + return this.topic; + } + + /** + * @return the temporary + */ + public boolean isTemporary() { + return this.temporary; + } + + /** + * @return true if a Topic + */ + public boolean isQueue() { + return !this.topic; + } + + /** + * @param props + */ + @Override + protected void buildFromProperties(Map<String, String> props) { + setName(getProperty(props, "name", "")); + Boolean bool = Boolean.valueOf(getProperty(props, "topic", Boolean.TRUE.toString())); + this.topic = bool.booleanValue(); + bool = Boolean.valueOf(getProperty(props, "temporary", Boolean.FALSE.toString())); + this.temporary = bool.booleanValue(); + } + + /** + * @param props + */ + @Override + protected void populateProperties(Map<String, String> props) { + props.put("name", getName()); + props.put("topic", Boolean.toString(isTopic())); + props.put("temporary", Boolean.toString(isTemporary())); + } + + /** + * @param other + * the Object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + @Override + public int compareTo(JmsDestination other) { + if (other != null) { + if (this == other) { + return 0; + } + if (isTemporary() == other.isTemporary()) { + return getName().compareTo(other.getName()); + } + return -1; + } + return -1; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JmsDestination d = (JmsDestination) o; + return getName().equals(d.getName()); + } + + @Override + public int hashCode() { + if (hashValue == 0) { + hashValue = getName().hashCode(); + } + return hashValue; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(getName()); + out.writeBoolean(isTopic()); + out.writeBoolean(isTemporary()); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + setName(in.readUTF()); + this.topic = in.readBoolean(); + this.temporary = in.readBoolean(); + } + + void setConnection(JmsConnection connection) { + this.connection = connection; + } + + JmsConnection getConnection() { + return this.connection; + } + + /** + * Attempts to delete the destination if there is an assigned Connection object. + * + * @throws JMSException if an error occurs or the provider doesn't support + * delete of destinations from the client. + */ + protected void tryDelete() throws JMSException { + if (connection != null) { + connection.deleteDestination(this); + } + } + + @Override + public void visit(JmsResourceVistor visitor) throws Exception { + visitor.processDestination(this); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java new file mode 100644 index 0000000..b7ae1b9 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.meta.JmsConsumerId; + +/** + * Implementation of a TopicSubscriber that is Durable + */ +public class JmsDurableTopicSubscriber extends JmsTopicSubscriber { + + /** + * Creates a durable TopicSubscriber + * + * @param id + * @param s + * @param destination + * @param name + * @param noLocal + * @param selector + * @throws JMSException + */ + public JmsDurableTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException { + super(id, s, destination, name, noLocal, selector); + } + + @Override + public boolean isDurableSubscription() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java new file mode 100644 index 0000000..c9395ba --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; +import org.apache.qpid.jms.meta.JmsTransactionId; +import org.apache.qpid.jms.meta.JmsTransactionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the details of a Session operating inside of a local JMS transaction. + */ +public class JmsLocalTransactionContext { + + private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class); + + private List<JmsTxSynchronization> synchronizations; + private final JmsSession session; + private final JmsConnection connection; + private JmsTransactionId transactionId; + private JmsTransactionListener listener; + + public JmsLocalTransactionContext(JmsSession session) { + this.session = session; + this.connection = session.getConnection(); + } + + /** + * Adds the given Transaction synchronization to the current list. + * + * @param synchronization + * the transaction synchronization to add. + */ + public void addSynchronization(JmsTxSynchronization s) { + if (synchronizations == null) { + synchronizations = new ArrayList<JmsTxSynchronization>(10); + } + synchronizations.add(s); + } + + /** + * Clears the current Transacted state. This is usually done when the client + * detects that a failover has occurred and needs to create a new Transaction + * for a Session that was previously enlisted in a transaction. + */ + public void clear() { + this.transactionId = null; + this.synchronizations = null; + } + + /** + * Start a local transaction. + * + * @throws javax.jms.JMSException on internal error + */ + public void begin() throws JMSException { + if (transactionId == null) { + synchronizations = null; + + transactionId = connection.getNextTransactionId(); + JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId); + connection.createResource(transaction); + + if (listener != null) { + listener.onTransactionStarted(); + } + + LOG.debug("Begin: {}", transactionId); + } + } + + /** + * Rolls back any work done in this transaction and releases any locks + * currently held. + * + * @throws JMSException + * if the JMS provider fails to roll back the transaction due to some internal error. + */ + public void rollback() throws JMSException { + if (transactionId != null) { + LOG.debug("Rollback: {} syncCount: {}", transactionId, + (synchronizations != null ? synchronizations.size() : 0)); + + transactionId = null; + connection.rollback(session.getSessionId()); + + if (listener != null) { + listener.onTransactionRolledBack(); + } + } + + afterRollback(); + } + + /** + * Commits all work done in this transaction and releases any locks + * currently held. + * + * @throws JMSException + * if the JMS provider fails to roll back the transaction due to some internal error. + */ + public void commit() throws JMSException { + if (transactionId != null) { + LOG.debug("Commit: {} syncCount: {}", transactionId, + (synchronizations != null ? synchronizations.size() : 0)); + + JmsTransactionId oldTransactionId = this.transactionId; + transactionId = null; + try { + connection.commit(session.getSessionId()); + if (listener != null) { + listener.onTransactionCommitted(); + } + afterCommit(); + } catch (JMSException cause) { + LOG.info("Commit failed for transaction: {}", oldTransactionId); + if (listener != null) { + listener.onTransactionRolledBack(); + } + afterRollback(); + throw cause; + } + } + } + + @Override + public String toString() { + return "JmsLocalTransactionContext{transactionId=" + transactionId + "}"; + } + + //------------- Getters and Setters --------------------------------------// + + public JmsTransactionId getTransactionId() { + return this.transactionId; + } + + public JmsTransactionListener getListener() { + return listener; + } + + public void setListener(JmsTransactionListener listener) { + this.listener = listener; + } + + public boolean isInTransaction() { + return this.transactionId != null; + } + + //------------- Implementation methods -----------------------------------// + + private void afterRollback() throws JMSException { + if (synchronizations == null) { + return; + } + + Throwable firstException = null; + int size = synchronizations.size(); + for (int i = 0; i < size; i++) { + try { + synchronizations.get(i).afterRollback(); + } catch (Throwable thrown) { + LOG.debug("Exception from afterRollback on " + synchronizations.get(i), thrown); + if (firstException == null) { + firstException = thrown; + } + } + } + synchronizations = null; + if (firstException != null) { + throw JmsExceptionSupport.create(firstException); + } + } + + private void afterCommit() throws JMSException { + if (synchronizations == null) { + return; + } + + Throwable firstException = null; + int size = synchronizations.size(); + for (int i = 0; i < size; i++) { + try { + synchronizations.get(i).afterCommit(); + } catch (Throwable thrown) { + LOG.debug("Exception from afterCommit on " + synchronizations.get(i), thrown); + if (firstException == null) { + firstException = thrown; + } + } + } + synchronizations = null; + if (firstException != null) { + throw JmsExceptionSupport.create(firstException); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java new file mode 100644 index 0000000..31f53c6 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.MessageConsumer; + +/** + * Marker interface used for MessageConsumer instances that support sending + * a notification event when a message has arrived when the consumer is not + * in asynchronous dispatch mode. + */ +public interface JmsMessageAvailableConsumer { + + /** + * Sets the listener used to notify synchronous consumers that there is a message + * available so that the {@link MessageConsumer#receiveNoWait()} can be called. + * + * @param availableListener + * the JmsMessageAvailableListener instance to signal. + */ + void setAvailableListener(JmsMessageAvailableListener availableListener); + + /** + * Gets the listener used to notify synchronous consumers that there is a message + * available so that the {@link MessageConsumer#receiveNoWait()} can be called. + * + * @return the currently configured message available listener instance. + */ + JmsMessageAvailableListener getAvailableListener(); + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
