Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Thu Jun 12 13:00:02 2014 @@ -23,34 +23,42 @@ import java.util.concurrent.CopyOnWriteA import javax.jms.*; import javax.transaction.xa.XAResource; +import org.apache.commons.pool.KeyedObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PooledSession implements Session, TopicSession, QueueSession { +public class PooledSession implements Session, TopicSession, QueueSession, XASession { private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); + private final SessionKey key; + private final KeyedObjectPool<SessionKey, PooledSession> sessionPool; + private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); + private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); + private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>(); + + private MessageProducer producer; + private TopicPublisher publisher; + private QueueSender sender; + private Session session; - private SessionPool sessionPool; - private MessageProducer messageProducer; - private QueueSender queueSender; - private TopicPublisher topicPublisher; private boolean transactional = true; private boolean ignoreClose; - - private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); - private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); - private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners = - new CopyOnWriteArrayList<PooledSessionEventListener>(); private boolean isXa; + private boolean useAnonymousProducers = true; - public PooledSession(Session session, SessionPool sessionPool, boolean transactional) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) { + this.key = key; this.session = session; this.sessionPool = sessionPool; this.transactional = transactional; + this.useAnonymousProducers = anonymous; } - public void addTempDestEventListener(PooledSessionEventListener listener) { - this.tempDestEventListeners.add(listener); + public void addSessionEventListener(PooledSessionEventListener listener) { + // only add if really needed + if (!sessionEventListeners.contains(listener)) { + this.sessionEventListeners.add(listener); + } } protected boolean isIgnoreClose() { @@ -61,10 +69,9 @@ public class PooledSession implements Se this.ignoreClose = ignoreClose; } + @Override public void close() throws JMSException { if (!ignoreClose) { - // TODO a cleaner way to reset?? - boolean invalidate = false; try { // lets reset the session @@ -95,11 +102,15 @@ public class PooledSession implements Se } finally { consumers.clear(); browsers.clear(); + for (PooledSessionEventListener listener : this.sessionEventListeners) { + listener.onSessionClosed(this); + } + sessionEventListeners.clear(); } if (invalidate) { - // lets close the session and not put the session back into - // the pool + // lets close the session and not put the session back into the pool + // instead invalidate it so the pool can create a new one on demand. if (session != null) { try { session.close(); @@ -108,114 +119,145 @@ public class PooledSession implements Se } session = null; } - sessionPool.invalidateSession(this); + try { + sessionPool.invalidateObject(key, this); + } catch (Exception e) { + LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); + } } else { - sessionPool.returnSession(this); + try { + sessionPool.returnObject(key, this); + } catch (Exception e) { + javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); + illegalStateException.initCause(e); + throw illegalStateException; + } } } } + @Override public void commit() throws JMSException { getInternalSession().commit(); } + @Override public BytesMessage createBytesMessage() throws JMSException { return getInternalSession().createBytesMessage(); } + @Override public MapMessage createMapMessage() throws JMSException { return getInternalSession().createMapMessage(); } + @Override public Message createMessage() throws JMSException { return getInternalSession().createMessage(); } + @Override public ObjectMessage createObjectMessage() throws JMSException { return getInternalSession().createObjectMessage(); } + @Override public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { return getInternalSession().createObjectMessage(serializable); } + @Override public Queue createQueue(String s) throws JMSException { return getInternalSession().createQueue(s); } + @Override public StreamMessage createStreamMessage() throws JMSException { return getInternalSession().createStreamMessage(); } + @Override public TemporaryQueue createTemporaryQueue() throws JMSException { TemporaryQueue result; result = getInternalSession().createTemporaryQueue(); // Notify all of the listeners of the created temporary Queue. - for (PooledSessionEventListener listener : this.tempDestEventListeners) { + for (PooledSessionEventListener listener : this.sessionEventListeners) { listener.onTemporaryQueueCreate(result); } return result; } + @Override public TemporaryTopic createTemporaryTopic() throws JMSException { TemporaryTopic result; result = getInternalSession().createTemporaryTopic(); // Notify all of the listeners of the created temporary Topic. - for (PooledSessionEventListener listener : this.tempDestEventListeners) { + for (PooledSessionEventListener listener : this.sessionEventListeners) { listener.onTemporaryTopicCreate(result); } return result; } + @Override public void unsubscribe(String s) throws JMSException { getInternalSession().unsubscribe(s); } + @Override public TextMessage createTextMessage() throws JMSException { return getInternalSession().createTextMessage(); } + @Override public TextMessage createTextMessage(String s) throws JMSException { return getInternalSession().createTextMessage(s); } + @Override public Topic createTopic(String s) throws JMSException { return getInternalSession().createTopic(s); } + @Override public int getAcknowledgeMode() throws JMSException { return getInternalSession().getAcknowledgeMode(); } + @Override public boolean getTransacted() throws JMSException { return getInternalSession().getTransacted(); } + @Override public void recover() throws JMSException { getInternalSession().recover(); } + @Override public void rollback() throws JMSException { getInternalSession().rollback(); } + @Override public XAResource getXAResource() { - if (session == null) { - throw new IllegalStateException("Session is closed"); + if (session instanceof XASession) { + return ((XASession) session).getXAResource(); } - return ((XASession) session).getXAResource(); + return null; } + @Override public Session getSession() { return this; } + @Override public void run() { if (session != null) { session.run(); @@ -224,112 +266,168 @@ public class PooledSession implements Se // Consumer related methods // ------------------------------------------------------------------------- + @Override public QueueBrowser createBrowser(Queue queue) throws JMSException { return addQueueBrowser(getInternalSession().createBrowser(queue)); } + @Override public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); } + @Override public MessageConsumer createConsumer(Destination destination) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination)); } + @Override public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination, selector)); } + @Override public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); } + @Override public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); } + @Override public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); } + @Override public MessageListener getMessageListener() throws JMSException { return getInternalSession().getMessageListener(); } + @Override public void setMessageListener(MessageListener messageListener) throws JMSException { getInternalSession().setMessageListener(messageListener); } + @Override public TopicSubscriber createSubscriber(Topic topic) throws JMSException { return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic)); } + @Override public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local)); } + @Override public QueueReceiver createReceiver(Queue queue) throws JMSException { return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue)); } + @Override public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector)); } // Producer related methods // ------------------------------------------------------------------------- + @Override public MessageProducer createProducer(Destination destination) throws JMSException { - return new PooledProducer(getMessageProducer(), destination); + return new PooledProducer(getMessageProducer(destination), destination); } + @Override public QueueSender createSender(Queue queue) throws JMSException { - return new PooledQueueSender(getQueueSender(), queue); + return new PooledQueueSender(getQueueSender(queue), queue); } + @Override public TopicPublisher createPublisher(Topic topic) throws JMSException { - return new PooledTopicPublisher(getTopicPublisher(), topic); - } - - /** - * Callback invoked when the consumer is closed. - * <p/> - * This is used to keep track of an explicit closed consumer created by this - * session, by which we know do not need to keep track of the consumer, as - * its already closed. - * - * @param consumer - * the consumer which is being closed - */ - protected void onConsumerClose(MessageConsumer consumer) { - consumers.remove(consumer); + return new PooledTopicPublisher(getTopicPublisher(topic), topic); } - public Session getInternalSession() throws JMSException { + public Session getInternalSession() throws IllegalStateException { if (session == null) { - throw new JMSException("The session has already been closed"); + throw new IllegalStateException("The session has already been closed"); } return session; } public MessageProducer getMessageProducer() throws JMSException { - if (messageProducer == null) { - messageProducer = getInternalSession().createProducer(null); + return getMessageProducer(null); + } + + public MessageProducer getMessageProducer(Destination destination) throws JMSException { + MessageProducer result = null; + + if (useAnonymousProducers) { + if (producer == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (producer == null) { + producer = getInternalSession().createProducer(null); + } + } + } + + result = producer; + } else { + result = getInternalSession().createProducer(destination); } - return messageProducer; + + return result; } public QueueSender getQueueSender() throws JMSException { - if (queueSender == null) { - queueSender = ((QueueSession) getInternalSession()).createSender(null); + return getQueueSender(null); + } + + public QueueSender getQueueSender(Queue destination) throws JMSException { + QueueSender result = null; + + if (useAnonymousProducers) { + if (sender == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (sender == null) { + sender = ((QueueSession) getInternalSession()).createSender(null); + } + } + } + + result = sender; + } else { + result = ((QueueSession) getInternalSession()).createSender(destination); } - return queueSender; + + return result; } public TopicPublisher getTopicPublisher() throws JMSException { - if (topicPublisher == null) { - topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null); + return getTopicPublisher(null); + } + + public TopicPublisher getTopicPublisher(Topic destination) throws JMSException { + TopicPublisher result = null; + + if (useAnonymousProducers) { + if (publisher == null) { + // Don't allow for duplicate anonymous producers. + synchronized (this) { + if (publisher == null) { + publisher = ((TopicSession) getInternalSession()).createPublisher(null); + } + } + } + + result = publisher; + } else { + result = ((TopicSession) getInternalSession()).createPublisher(destination); } - return topicPublisher; + + return result; } private QueueBrowser addQueueBrowser(QueueBrowser browser) { @@ -340,10 +438,8 @@ public class PooledSession implements Se private MessageConsumer addConsumer(MessageConsumer consumer) { consumers.add(consumer); // must wrap in PooledMessageConsumer to ensure the onConsumerClose - // method is invoked - // when the returned consumer is closed, to avoid memory leak in this - // session class - // in case many consumers is created + // method is invoked when the returned consumer is closed, to avoid memory + // leak in this session class in case many consumers is created return new PooledMessageConsumer(this, consumer); } @@ -361,7 +457,22 @@ public class PooledSession implements Se this.isXa = isXa; } + @Override public String toString() { return "PooledSession { " + session + " }"; } + + /** + * Callback invoked when the consumer is closed. + * <p/> + * This is used to keep track of an explicit closed consumer created by this + * session, by which we know do not need to keep track of the consumer, as + * its already closed. + * + * @param consumer + * the consumer which is being closed + */ + protected void onConsumerClose(MessageConsumer consumer) { + consumers.remove(consumer); + } }
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Thu Jun 12 13:00:02 2014 @@ -25,7 +25,7 @@ interface PooledSessionEventListener { * Called on successful creation of a new TemporaryQueue. * * @param tempQueue - * The TemporaryQueue just created. + * The TemporaryQueue just created. */ void onTemporaryQueueCreate(TemporaryQueue tempQueue); @@ -33,8 +33,16 @@ interface PooledSessionEventListener { * Called on successful creation of a new TemporaryTopic. * * @param tempTopic - * The TemporaryTopic just created. + * The TemporaryTopic just created. */ void onTemporaryTopicCreate(TemporaryTopic tempTopic); + /** + * Called when the PooledSession is closed. + * + * @param session + * The PooledSession that has been closed. + */ + void onSessionClosed(PooledSession session); + } Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Thu Jun 12 13:00:02 2014 @@ -16,6 +16,7 @@ */ package org.apache.aries.transaction.jms.internal; +import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.XAConnection; import javax.jms.XASession; @@ -29,8 +30,8 @@ public class RecoverableConnectionPool e private String name; - public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException { - super(connection, poolFactory, transactionManager); + public RecoverableConnectionPool(Connection connection, TransactionManager transactionManager, String name) { + super(connection, transactionManager); this.name = name; } Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Thu Jun 12 13:00:02 2014 @@ -22,8 +22,10 @@ package org.apache.aries.transaction.jms * */ public class SessionKey { + private boolean transacted; private int ackMode; + private int hash; public SessionKey(boolean transacted, int ackMode) { Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Thu Jun 12 13:00:02 2014 @@ -16,8 +16,11 @@ */ package org.apache.aries.transaction.jms.internal; +import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; import javax.jms.XAConnection; import javax.transaction.RollbackException; import javax.transaction.Status; @@ -35,28 +38,58 @@ import org.apache.commons.pool.ObjectPoo */ public class XaConnectionPool extends ConnectionPool { - private TransactionManager transactionManager; + private final TransactionManager transactionManager; - public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException { - super(connection, poolFactory); + public XaConnectionPool(Connection connection, TransactionManager transactionManager) { + super(connection); this.transactionManager = transactionManager; } + @Override + protected Session makeSession(SessionKey key) throws JMSException { + return ((XAConnection) connection).createXASession(); + } + + @Override public Session createSession(boolean transacted, int ackMode) throws JMSException { - PooledSession session = null; try { boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); if (isXa) { - transacted = true; - ackMode = Session.SESSION_TRANSACTED; - session = (PooledSession) super.createXaSession(transacted, ackMode); + // if the xa tx aborts inflight we don't want to auto create a + // local transaction or auto ack + transacted = false; + ackMode = Session.CLIENT_ACKNOWLEDGE; + } else if (transactionManager != null) { + // cmt or transactionManager managed + transacted = false; + if (ackMode == Session.SESSION_TRANSACTED) { + ackMode = Session.AUTO_ACKNOWLEDGE; + } + } + PooledSession session = (PooledSession) super.createSession(transacted, ackMode); + if (isXa) { + session.addSessionEventListener(new PooledSessionEventListener() { + + @Override + public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { + } + + @Override + public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { + } + + @Override + public void onSessionClosed(PooledSession session) { + session.setIgnoreClose(true); + session.setIsXa(false); + } + }); session.setIgnoreClose(true); session.setIsXa(true); transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); incrementReferenceCount(); transactionManager.getTransaction().enlistResource(createXaResource(session)); } else { - session = (PooledSession) super.createSession(transacted, ackMode); session.setIgnoreClose(false); } return session; @@ -74,8 +107,7 @@ public class XaConnectionPool extends Co protected XAResource createXaResource(PooledSession session) throws JMSException { return session.getXAResource(); } - - + protected class Synchronization implements javax.transaction.Synchronization { private final PooledSession session; @@ -83,21 +115,20 @@ public class XaConnectionPool extends Co this.session = session; } + @Override public void beforeCompletion() { } - + + @Override public void afterCompletion(int status) { try { // This will return session to the pool. session.setIgnoreClose(false); session.close(); - session.setIgnoreClose(true); - session.setIsXa(false); decrementReferenceCount(); } catch (JMSException e) { throw new RuntimeException(e); } } } - } Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (original) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Thu Jun 12 13:00:02 2014 @@ -16,58 +16,133 @@ */ package org.apache.aries.transaction.jms.internal; +import java.io.Serializable; +import java.util.Hashtable; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; import javax.jms.XAConnection; import javax.jms.XAConnectionFactory; +import javax.naming.Binding; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.Name; +import javax.naming.NamingEnumeration; +import javax.naming.spi.ObjectFactory; import javax.transaction.TransactionManager; import org.apache.aries.transaction.jms.PooledConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A pooled connection factory that automatically enlists * sessions in the current active XA transaction if any. */ -public class XaPooledConnectionFactory extends PooledConnectionFactory { +public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, + Serializable, QueueConnectionFactory, TopicConnectionFactory { - private XAConnectionFactory xaConnectionFactory; + private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class); private TransactionManager transactionManager; - - public XaPooledConnectionFactory() { - super(); + private boolean tmFromJndi = false; + private String tmJndiName = "java:/TransactionManager"; + + public TransactionManager getTransactionManager() { + if (transactionManager == null && tmFromJndi) { + try { + transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName()); + } catch (Throwable ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored); + } + } + } + return transactionManager; } - public XAConnectionFactory getXaConnectionFactory() { - return xaConnectionFactory; + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; } - public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) { - this.xaConnectionFactory = xaConnectionFactory; - setConnectionFactory(new ConnectionFactory() { - public Connection createConnection() throws JMSException { - return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(); - } - public Connection createConnection(String userName, String password) throws JMSException { - return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password); + @Override + protected ConnectionPool createConnectionPool(Connection connection) { + return new XaConnectionPool(connection, getTransactionManager()); + } + + @Override + public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception { + setTmFromJndi(true); + configFromJndiConf(obj); + if (environment != null) { + IntrospectionSupport.setProperties(this, environment); + } + return this; + } + + private void configFromJndiConf(Object rootContextName) { + if (rootContextName instanceof String) { + String name = (String) rootContextName; + name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/')); + try { + InitialContext ctx = new InitialContext(); + NamingEnumeration bindings = ctx.listBindings(name); + + while (bindings.hasMore()) { + Binding bd = (Binding)bindings.next(); + IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject()); + } + + } catch (Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace("exception on config from jndi: " + name, ignored); + } } - }); + } } - public TransactionManager getTransactionManager() { - return transactionManager; + public String getTmJndiName() { + return tmJndiName; + } + + public void setTmJndiName(String tmJndiName) { + this.tmJndiName = tmJndiName; + } + + public boolean isTmFromJndi() { + return tmFromJndi; } /** - * The XA TransactionManager to use to enlist the JMS sessions into. - * - * @org.apache.xbean.Property required=true + * Allow transaction manager resolution from JNDI (ee deployment) + * @param tmFromJndi */ - public void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; + public void setTmFromJndi(boolean tmFromJndi) { + this.tmFromJndi = tmFromJndi; + } + + @Override + public QueueConnection createQueueConnection() throws JMSException { + return (QueueConnection) createConnection(); } - protected ConnectionPool createConnectionPool(Connection connection) throws JMSException { - return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager()); + @Override + public QueueConnection createQueueConnection(String userName, String password) throws JMSException { + return (QueueConnection) createConnection(userName, password); } + + @Override + public TopicConnection createTopicConnection() throws JMSException { + return (TopicConnection) createConnection(); + } + + @Override + public TopicConnection createTopicConnection(String userName, String password) throws JMSException { + return (TopicConnection) createConnection(userName, password); + } + } Modified: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1602148&r1=1602147&r2=1602148&view=diff ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (original) +++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Thu Jun 12 13:00:02 2014 @@ -21,13 +21,13 @@ limitations under the License. <service interface="org.apache.aries.blueprint.NamespaceHandler"> <service-properties> - <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/> + <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/2.0"/> </service-properties> <bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler"> - <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/> + <argument value="http://aries.apache.org/xmlns/transaction-jms/2.0"/> <argument value="org.apache.aries.transaction.jms.xsd"/> <argument ref="blueprintBundle"/> - <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/> + <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/2.0"/> </bean> </service>
