Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + +/** + * + */ +public class PooledTopicPublisher extends PooledProducer implements TopicPublisher { + + public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException { + super(messageProducer, destination); + } + + public Topic getTopic() throws JMSException { + return getTopicPublisher().getTopic(); + } + + public void publish(Message message) throws JMSException { + getTopicPublisher().publish((Topic) getDestination(), message); + } + + public void publish(Message message, int i, int i1, long l) throws JMSException { + getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l); + } + + public void publish(Topic topic, Message message) throws JMSException { + getTopicPublisher().publish(topic, message); + } + + public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException { + getTopicPublisher().publish(topic, message, i, i1, l); + } + + protected TopicPublisher getTopicPublisher() { + return (TopicPublisher) getMessageProducer(); + } +}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; + +import org.apache.commons.pool.ObjectPoolFactory; +import org.apache.geronimo.transaction.manager.WrapperNamedXAResource; + +public class RecoverableConnectionPool extends XaConnectionPool { + + private String name; + + public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException { + super(connection, poolFactory, transactionManager); + this.name = name; + } + + protected XAResource createXaResource(PooledSession session) throws JMSException { + XAResource xares = ((XASession) session.getInternalSession()).getXAResource(); + if (name != null) { + xares = new WrapperNamedXAResource(xares, name); + } + return xares; + } +} Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +/** + * A cache key for the session details + * + * + */ +public class SessionKey { + private boolean transacted; + private int ackMode; + private int hash; + + public SessionKey(boolean transacted, int ackMode) { + this.transacted = transacted; + this.ackMode = ackMode; + hash = ackMode; + if (transacted) { + hash = 31 * hash + 1; + } + } + + public int hashCode() { + return hash; + } + + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that instanceof SessionKey) { + return equals((SessionKey) that); + } + return false; + } + + public boolean equals(SessionKey that) { + return this.transacted == that.transacted && this.ackMode == that.ackMode; + } + + public boolean isTransacted() { + return transacted; + } + + public int getAckMode() { + return ackMode; + } +} Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XAConnection; +import javax.jms.XASession; + +import org.apache.commons.pool.ObjectPool; +import org.apache.commons.pool.PoolableObjectFactory; + +/** + * Represents the session pool for a given JMS connection. + * + * + */ +public class SessionPool implements PoolableObjectFactory { + private ConnectionPool connectionPool; + private SessionKey key; + private ObjectPool sessionPool; + + public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) { + this.connectionPool = connectionPool; + this.key = key; + this.sessionPool = sessionPool; + sessionPool.setFactory(this); + } + + public void close() throws Exception { + if (sessionPool != null) { + sessionPool.close(); + } + sessionPool = null; + } + + public PooledSession borrowSession() throws JMSException { + try { + Object object = getSessionPool().borrowObject(); + return (PooledSession)object; + } catch (JMSException e) { + throw e; + } catch (Exception e) { + throw JMSExceptionSupport.create(e); + } + } + + public void returnSession(PooledSession session) throws JMSException { + // lets check if we are already closed + getConnection(); + try { + connectionPool.onSessionReturned(session); + getSessionPool().returnObject(session); + } catch (Exception e) { + throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e); + } + } + + public void invalidateSession(PooledSession session) throws JMSException { + try { + connectionPool.onSessionInvalidated(session); + getSessionPool().invalidateObject(session); + } catch (Exception e) { + throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e); + } + } + + // PoolableObjectFactory methods + // ------------------------------------------------------------------------- + public Object makeObject() throws Exception { + if (getConnection() instanceof XAConnection) { + return new PooledSession(createXaSession(), this, key.isTransacted()); + } else { + return new PooledSession(createSession(), this, key.isTransacted()); + } + } + + public void destroyObject(Object o) throws Exception { + PooledSession session = (PooledSession)o; + session.getInternalSession().close(); + } + + public boolean validateObject(Object o) { + return true; + } + + public void activateObject(Object o) throws Exception { + } + + public void passivateObject(Object o) throws Exception { + } + + // Implemention methods + // ------------------------------------------------------------------------- + protected ObjectPool getSessionPool() throws JMSException { + if (sessionPool == null) { + throw new JMSException("Already closed"); + } + return sessionPool; + } + + protected Connection getConnection() throws JMSException { + return connectionPool.getConnection(); + } + + protected Session createSession() throws JMSException { + return getConnection().createSession(key.isTransacted(), key.getAckMode()); + } + + protected XASession createXaSession() throws JMSException { + return ((XAConnection)getConnection()).createXASession(); + } + +} Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XAConnection; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; + +import org.apache.commons.pool.ObjectPoolFactory; + +/** + * An XA-aware connection pool. When a session is created and an xa transaction is active, + * the session will automatically be enlisted in the current transaction. + * + * @author gnodet + */ +public class XaConnectionPool extends ConnectionPool { + + private TransactionManager transactionManager; + + public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException { + super(connection, poolFactory); + this.transactionManager = transactionManager; + } + + public Session createSession(boolean transacted, int ackMode) throws JMSException { + PooledSession session = null; + try { + boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); + if (isXa) { + transacted = true; + ackMode = Session.SESSION_TRANSACTED; + session = (PooledSession) super.createXaSession(transacted, ackMode); + session.setIgnoreClose(true); + session.setIsXa(true); + transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); + incrementReferenceCount(); + transactionManager.getTransaction().enlistResource(createXaResource(session)); + } else { + session = (PooledSession) super.createSession(transacted, ackMode); + session.setIgnoreClose(false); + } + return session; + } catch (RollbackException e) { + final JMSException jmsException = new JMSException("Rollback Exception"); + jmsException.initCause(e); + throw jmsException; + } catch (SystemException e) { + final JMSException jmsException = new JMSException("System Exception"); + jmsException.initCause(e); + throw jmsException; + } + } + + protected XAResource createXaResource(PooledSession session) throws JMSException { + return session.getXAResource(); + } + + + protected class Synchronization implements javax.transaction.Synchronization { + private final PooledSession session; + + private Synchronization(PooledSession session) { + this.session = session; + } + + public void beforeCompletion() { + } + + public void afterCompletion(int status) { + try { + // This will return session to the pool. + session.setIgnoreClose(false); + session.close(); + session.setIgnoreClose(true); + session.setIsXa(false); + decrementReferenceCount(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + +} Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Tue Jul 31 17:08:04 2012 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aries.transaction.jms.internal; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.transaction.TransactionManager; + +import org.apache.aries.transaction.jms.PooledConnectionFactory; + +/** + * A pooled connection factory that automatically enlists + * sessions in the current active XA transaction if any. + */ +public class XaPooledConnectionFactory extends PooledConnectionFactory { + + private XAConnectionFactory xaConnectionFactory; + private TransactionManager transactionManager; + + public XaPooledConnectionFactory() { + super(); + } + + public XAConnectionFactory getXaConnectionFactory() { + return xaConnectionFactory; + } + + public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) { + this.xaConnectionFactory = xaConnectionFactory; + setConnectionFactory(new ConnectionFactory() { + public Connection createConnection() throws JMSException { + return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(); + } + public Connection createConnection(String userName, String password) throws JMSException { + return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password); + } + }); + } + + public TransactionManager getTransactionManager() { + return transactionManager; + } + + /** + * The XA TransactionManager to use to enlist the JMS sessions into. + * + * @org.apache.xbean.Property required=true + */ + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + protected ConnectionPool createConnectionPool(Connection connection) throws JMSException { + return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager()); + } +} Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html (added) +++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html Tue Jul 31 17:08:04 2012 @@ -0,0 +1,26 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<html> +<head> +</head> +<body> + +A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like +Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>. + +</body> +</html> Propchange: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html ------------------------------------------------------------------------------ svn:executable = * Added: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1367676&view=auto ============================================================================== --- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (added) +++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Tue Jul 31 17:08:04 2012 @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. + +See the License for the specific language governing permissions and +limitations under the License. +--> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> + + <service interface="org.apache.aries.blueprint.NamespaceHandler"> + <service-properties> + <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/> + </service-properties> + <bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler"> + <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/> + <argument value="org.apache.aries.transaction.jms.xsd"/> + <argument ref="blueprintBundle"/> + <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/> + </bean> + </service> + +</blueprint>
