| mattrpav commented on code in PR #729: URL: https://github.com/apache/activemq/pull/729#discussion_r910180019 ########## activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java: ########## @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.JMSConsumer; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSProducer; +import javax.jms.JMSRuntimeException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.util.JMSExceptionSupport; + +/** + * In terms of the JMS 1.1 API a JMSContext should be thought of as + * representing both a Connection and a Session. Although the simplified + * API removes the need for applications to use those objects, the concepts + * of connection and session remain important. A connection represents a + * physical link to the JMS server and a session represents a + * single-threaded context for sending and receiving messages. + * + * + * @see javax.jms.JMSContext + */ + +public class ActiveMQContext implements JMSContext { + + private static final boolean DEFAULT_AUTO_START = true; + + private final ActiveMQConnection activemqConnection; + private final AtomicLong connectionCounter; + private ActiveMQSession activemqSession = null; + + // Configuration + private boolean autoStart = DEFAULT_AUTO_START; + private final int sessionMode; + + // State + private boolean closeInvoked = false; + private final AtomicBoolean startInvoked = new AtomicBoolean(false); + private ActiveMQMessageProducer activemqMessageProducer = null; + + ActiveMQContext(final ActiveMQConnection activemqConnection) { + this.activemqConnection = activemqConnection; + this.sessionMode = AUTO_ACKNOWLEDGE; + this.connectionCounter = new AtomicLong(1l); + } + + ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode) { + this.activemqConnection = activemqConnection; + this.sessionMode = sessionMode; + this.connectionCounter = new AtomicLong(1l); + } + + private ActiveMQContext(final ActiveMQConnection activemqConnection, final int sessionMode, final AtomicLong connectionCounter) { + this.activemqConnection = activemqConnection; + this.sessionMode = sessionMode; + this.connectionCounter = connectionCounter; + } + + @Override + public JMSContext createContext(int sessionMode) { + if(connectionCounter.get() == 0l) { + throw new JMSRuntimeException("Context already closed"); + } + + connectionCounter.incrementAndGet(); + return new ActiveMQContext(activemqConnection, sessionMode, connectionCounter); + } + + @Override + public JMSProducer createProducer() { + return new ActiveMQProducer(this, getCreatedActiveMQMessageProducer()); + } + + @Override + public String getClientID() { + try { + return this.activemqConnection.getClientID(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public void setClientID(String clientID) { + try { + this.activemqConnection.setClientID(clientID); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public ConnectionMetaData getMetaData() { + checkContextState(); + try { + return this.activemqConnection.getMetaData(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public ExceptionListener getExceptionListener() { + checkContextState(); + try { + return this.activemqConnection.getExceptionListener(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public void setExceptionListener(ExceptionListener listener) { + checkContextState(); + try { + this.activemqConnection.setExceptionListener(listener); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public void start() { + checkContextState(); + try { + if(startInvoked.compareAndSet(false, true)) { + this.activemqConnection.start(); + } + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public void stop() { + checkContextState(); + try { + if(startInvoked.compareAndSet(true, false)) { + this.activemqConnection.stop(); + } + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public void setAutoStart(boolean autoStart) { + this.autoStart = autoStart; + } + + @Override + public boolean getAutoStart() { + return this.autoStart; + } + + @Override + public void close() { + JMSRuntimeException firstException = null; + + if(this.activemqMessageProducer != null) { + try { + this.activemqMessageProducer.close(); + } catch (JMSException e) { + if(firstException == null) { + firstException = JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + } + + if(this.activemqSession != null) { + try { + this.activemqSession.close(); + } catch (JMSException e) { + if(firstException == null) { + firstException = JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + } + + if(connectionCounter.decrementAndGet() == 0) { + if(this.activemqConnection != null) { + try { + closeInvoked = true; + this.activemqConnection.close(); + } catch (JMSException e) { + if(firstException == null) { + firstException = JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + } + } + + if(firstException != null) { + throw firstException; + } + } + + @Override + public BytesMessage createBytesMessage() { + checkContextState(); + try { + return activemqSession.createBytesMessage(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public MapMessage createMapMessage() { + checkContextState(); + try { + return activemqSession.createMapMessage(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public Message createMessage() { + checkContextState(); + try { + return activemqSession.createMessage(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public ObjectMessage createObjectMessage() { + checkContextState(); + try { + return activemqSession.createObjectMessage(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public ObjectMessage createObjectMessage(Serializable object) { + checkContextState(); + try { + return activemqSession.createObjectMessage(object); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public StreamMessage createStreamMessage() { + checkContextState(); + try { + return activemqSession.createStreamMessage(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public TextMessage createTextMessage() { + checkContextState(); + try { + return activemqSession.createTextMessage(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public TextMessage createTextMessage(String text) { + checkContextState(); + try { + return activemqSession.createTextMessage(text); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public boolean getTransacted() { + checkContextState(); + try { + return activemqSession.getTransacted(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public int getSessionMode() { + return this.sessionMode; + } + + @Override + public void commit() { + checkContextState(); + try { + activemqSession.commit(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public void rollback() { + checkContextState(); + try { + activemqSession.rollback(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public void recover() { + checkContextState(); + try { + activemqSession.recover(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public JMSConsumer createConsumer(Destination destination) { + checkContextState(); + try { + if(getAutoStart()) { + start(); + } + return new ActiveMQConsumer(this, activemqSession.createConsumer(destination)); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public JMSConsumer createConsumer(Destination destination, String messageSelector) { + checkContextState(); + try { + if(getAutoStart()) {+ start();+ } + return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector)); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) { + checkContextState(); + try { + if(getAutoStart()) { + start(); + } + return new ActiveMQConsumer(this, activemqSession.createConsumer(destination, messageSelector, noLocal)); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public Queue createQueue(String queueName) { + checkContextState(); + try { + return activemqSession.createQueue(queueName); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public Topic createTopic(String topicName) { + checkContextState(); + try { + return activemqSession.createTopic(topicName); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public JMSConsumer createDurableConsumer(Topic topic, String name) { + throw new UnsupportedOperationException("createDurableConsumer(topic, name) is not supported"); + } + + @Override + public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) { + throw new UnsupportedOperationException("createDurableConsumer(topic, name, messageSelector, noLocal) is not supported"); + } + + @Override + public JMSConsumer createSharedDurableConsumer(Topic topic, String name) { + throw new UnsupportedOperationException("createSharedDurableConsumer(topic, name) is not supported"); + } + + @Override + public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) { + throw new UnsupportedOperationException("createDurableConsumer(topic, name, messageSelector) is not supported"); + } + + @Override + public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) { + throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName) is not supported"); + } + + @Override + public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) { + throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName, messageSelector) is not supported"); + } + + @Override + public QueueBrowser createBrowser(Queue queue) { + checkContextState(); + try { + if(getAutoStart()) { + start(); + } + return activemqSession.createBrowser(queue); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) { + checkContextState(); + try { + if(getAutoStart()) {+ start();+ } + return activemqSession.createBrowser(queue, messageSelector); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public TemporaryQueue createTemporaryQueue() { + checkContextState(); + try { + return activemqSession.createTemporaryQueue(); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public TemporaryTopic createTemporaryTopic() { + checkContextState(); + try { + return activemqSession.createTemporaryTopic(); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + } + + @Override + public void unsubscribe(String name) { + checkContextState(); + try { + activemqSession.unsubscribe(name); + } catch (JMSException e) {+ throw JMSExceptionSupport.convertToJMSRuntimeException(e);+ } + } + + @Override + public void acknowledge() { + throw new UnsupportedOperationException("acknowledge() is not supported"); Review Comment: This is complete |