JMS Connection wrapper & close dangling on destroy.
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/43dd894b Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/43dd894b Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/43dd894b Branch: refs/heads/tomee-1.7.x Commit: 43dd894b04fdb26d705113ace4a93aa294699cea Parents: a4330bf Author: AndyGee <[email protected]> Authored: Wed Nov 18 19:52:59 2015 +0100 Committer: AndyGee <[email protected]> Committed: Wed Nov 18 19:52:59 2015 +0100 ---------------------------------------------------------------------- .../org/apache/openejb/InjectionProcessor.java | 20 +- .../main/java/org/apache/openejb/OpenEJB.java | 6 + .../resource/activemq/ActiveMQ5Factory.java | 8 +- .../activemq/ConnectionFactoryWrapper.java | 66 +++++++ .../resource/activemq/ConnectionWrapper.java | 124 ++++++++++++ .../resource/activemq/SessionWrapper.java | 197 +++++++++++++++++++ .../injection/jms/MessagingBeanTest.java | 8 +- 7 files changed, 424 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java index 054d0c1..5c64757 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/InjectionProcessor.java @@ -20,6 +20,7 @@ package org.apache.openejb; import org.apache.openejb.core.ivm.naming.JndiUrlReference; import org.apache.openejb.injection.FallbackPropertyInjector; import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper; import org.apache.openejb.spi.ContainerSystem; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; @@ -28,6 +29,7 @@ import org.apache.xbean.naming.reference.SimpleReference; import org.apache.xbean.recipe.ObjectRecipe; import org.apache.xbean.recipe.Option; +import javax.jms.ConnectionFactory; import javax.naming.Context; import javax.naming.NamingException; import java.lang.reflect.InvocationTargetException; @@ -97,6 +99,7 @@ public class InjectionProcessor<T> { return instance; } + @SuppressWarnings("unchecked") private void construct() throws OpenEJBException { if (instance != null) { throw new IllegalStateException("Instance already constructed"); @@ -179,6 +182,7 @@ public class InjectionProcessor<T> { } } + @SuppressWarnings("unchecked") private void fillInjectionProperties(final ObjectRecipe objectRecipe) { if (injections == null) { return; @@ -201,7 +205,7 @@ public class InjectionProcessor<T> { clazz = suppliedInstance.getClass(); } - if (context != null) { + if (null != context && null != clazz) { for (final Injection injection : injections) { if (injection.getTarget() == null) { continue; @@ -222,7 +226,13 @@ public class InjectionProcessor<T> { } catch (final NamingException e) { if (value instanceof JndiUrlReference) { try { - value = SystemInstance.get().getComponent(ContainerSystem.class).getJNDIContext() + final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class); + + if(null == containerSystem){ + throw new IllegalStateException("ContainerSystem has not been initialized"); + } + + value = containerSystem.getJNDIContext() .lookup(((JndiUrlReference) value).getJndiName()); } catch (final NamingException e1) { value = null; @@ -240,6 +250,12 @@ public class InjectionProcessor<T> { } if (value != null) { + + if(ConnectionFactory.class.isInstance(value)){ + //Wrap + value = new ConnectionFactoryWrapper(ConnectionFactory.class.cast(value)); + } + final String prefix; if (usePrefix) { prefix = injection.getTarget().getName() + "/"; http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java index 32c761e..a37ed38 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/OpenEJB.java @@ -21,6 +21,7 @@ import org.apache.openejb.assembler.classic.DeploymentExceptionManager; import org.apache.openejb.cdi.CdiBuilder; import org.apache.openejb.core.ServerFederation; import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.resource.activemq.ConnectionFactoryWrapper; import org.apache.openejb.spi.ApplicationServer; import org.apache.openejb.spi.Assembler; import org.apache.openejb.spi.ContainerSystem; @@ -261,12 +262,17 @@ public final class OpenEJB { } public static void destroy() { + final Assembler assembler = SystemInstance.get().getComponent(Assembler.class); + if (assembler != null) { assembler.destroy(); } else { SystemInstance.reset(); } + + ConnectionFactoryWrapper.closeConnections(); + instance = null; } http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java index eb34606..177c1ff 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQ5Factory.java @@ -87,6 +87,7 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler { final URI uri = new URI(cleanUpUri(brokerURI.getSchemeSpecificPart(), compositeData.getParameters(), params)); broker = BrokerFactory.createBroker(uri); + broker.setUseShutdownHook(false); brokers.put(brokerURI, broker); if (persistenceAdapter != null) { @@ -116,9 +117,14 @@ public class ActiveMQ5Factory implements BrokerFactoryHandler { try { final ContainerSystem containerSystem = SystemInstance.get().getComponent(ContainerSystem.class); + + if (null == containerSystem) { + throw new IllegalArgumentException("ContainerSystem has not been initialized"); + } + final Context context = containerSystem.getJNDIContext(); final Object obj = context.lookup("openejb/Resource/" + resouceId); - if (!(obj instanceof DataSource)) { + if (!DataSource.class.isInstance(obj)) { throw new IllegalArgumentException("Resource with id " + resouceId + " is not a DataSource, but is " + obj.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java new file mode 100644 index 0000000..731f286 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionFactoryWrapper.java @@ -0,0 +1,66 @@ +/** + * Tomitribe Confidential + * <p/> + * Copyright(c) Tomitribe Corporation. 2014 + * <p/> + * The source code for this program is not published or otherwise divested + * of its trade secrets, irrespective of what has been deposited with the + * U.S. Copyright Office. + * <p/> + */ +package org.apache.openejb.resource.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ConnectionFactoryWrapper implements ConnectionFactory { + + private static final ArrayList<ConnectionWrapper> connections = new ArrayList<ConnectionWrapper>(); + + private final ConnectionFactory factory; + + public ConnectionFactoryWrapper(final ConnectionFactory factory) { + this.factory = factory; + } + + @Override + public Connection createConnection() throws JMSException { + return getConnection(factory.createConnection()); + } + + @Override + public Connection createConnection(final String userName, final String password) throws JMSException { + return getConnection(factory.createConnection(userName, password)); + } + + private static Connection getConnection(final Connection connection) { + final ConnectionWrapper wrapper = new ConnectionWrapper(connection); + connections.add(wrapper); + return wrapper; + } + + protected static void remove(final ConnectionWrapper connectionWrapper) { + connections.remove(connectionWrapper); + } + + public static void closeConnections() { + final Iterator<ConnectionWrapper> iterator = connections.iterator(); + + while (iterator.hasNext()) { + final ConnectionWrapper next = iterator.next(); + iterator.remove(); + try { + next.close(); + } catch (final Exception e) { + //no-op + } finally { + Logger.getLogger(ConnectionFactoryWrapper.class.getName()).log(Level.SEVERE, "Closed a JMS connection. You have an application that fails to close this connection"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java new file mode 100644 index 0000000..6dd90ef --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ConnectionWrapper.java @@ -0,0 +1,124 @@ +/** + * Tomitribe Confidential + * <p/> + * Copyright(c) Tomitribe Corporation. 2014 + * <p/> + * The source code for this program is not published or otherwise divested + * of its trade secrets, irrespective of what has been deposited with the + * U.S. Copyright Office. + * <p/> + */ +package org.apache.openejb.resource.activemq; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.Iterator; + +public class ConnectionWrapper implements Connection { + + private final ArrayList<SessionWrapper> sessions = new ArrayList<SessionWrapper>(); + + private final Connection con; + + public ConnectionWrapper(final Connection con) { + this.con = con; + } + + @Override + public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { + return getSession(con.createSession(transacted, acknowledgeMode)); + } + + private Session getSession(final Session session) { + final SessionWrapper wrapper = new SessionWrapper(this, session); + sessions.add(wrapper); + return wrapper; + } + + protected void remove(final SessionWrapper wrapper) { + sessions.remove(wrapper); + } + + @Override + public String getClientID() throws JMSException { + return con.getClientID(); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(final Topic topic, final String subscriptionName, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException { + return con.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages); + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + return con.getExceptionListener(); + } + + @Override + public void setClientID(final String clientID) throws JMSException { + con.setClientID(clientID); + } + + @Override + public ConnectionConsumer createConnectionConsumer(final Destination destination, final String messageSelector, final ServerSessionPool sessionPool, final int maxMessages) throws JMSException { + return con.createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages); + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return con.getMetaData(); + } + + @Override + public void close() throws JMSException { + + final Iterator<SessionWrapper> iterator = sessions.iterator(); + + while (iterator.hasNext()) { + final SessionWrapper next = iterator.next(); + iterator.remove(); + try { + next.close(); + } catch (final Exception e) { + //no-op + } + } + + try { + con.close(); + } finally { + ConnectionFactoryWrapper.remove(this); + } + } + + @Override + public void stop() throws JMSException { + con.stop(); + } + + @Override + public void setExceptionListener(final ExceptionListener listener) throws JMSException { + con.setExceptionListener(listener); + } + + @Override + public void start() throws JMSException { + con.start(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final ConnectionWrapper that = (ConnectionWrapper) o; + + return con.equals(that.con); + + } + + @Override + public int hashCode() { + return con.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java new file mode 100644 index 0000000..f7f5ef8 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/SessionWrapper.java @@ -0,0 +1,197 @@ +/** + * Tomitribe Confidential + * <p/> + * Copyright(c) Tomitribe Corporation. 2014 + * <p/> + * The source code for this program is not published or otherwise divested + * of its trade secrets, irrespective of what has been deposited with the + * U.S. Copyright Office. + * <p/> + */ +package org.apache.openejb.resource.activemq; + +import javax.jms.*; +import java.io.Serializable; + +public class SessionWrapper implements Session { + + private final ConnectionWrapper connectionWrapper; + private final Session session; + + public SessionWrapper(final ConnectionWrapper connectionWrapper, final Session session) { + this.connectionWrapper = connectionWrapper; + this.session = session; + } + + @Override + public BytesMessage createBytesMessage() throws JMSException { + return session.createBytesMessage(); + } + + @Override + public TextMessage createTextMessage() throws JMSException { + return session.createTextMessage(); + } + + @Override + public Message createMessage() throws JMSException { + return session.createMessage(); + } + + @Override + public boolean getTransacted() throws JMSException { + return session.getTransacted(); + } + + @Override + public void rollback() throws JMSException { + session.rollback(); + } + + @Override + public MessageConsumer createConsumer(final Destination destination, final String messageSelector, final boolean NoLocal) throws JMSException { + return session.createConsumer(destination, messageSelector, NoLocal); + } + + @Override + public QueueBrowser createBrowser(final Queue queue) throws JMSException { + return session.createBrowser(queue); + } + + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException { + return session.createTemporaryQueue(); + } + + @Override + public MapMessage createMapMessage() throws JMSException { + return session.createMapMessage(); + } + + @Override + public MessageConsumer createConsumer(final Destination destination) throws JMSException { + return session.createConsumer(destination); + } + + @Override + public void close() throws JMSException { + try { + session.close(); + } finally { + this.connectionWrapper.remove(this); + } + } + + @Override + public void unsubscribe(final String name) throws JMSException { + session.unsubscribe(name); + } + + @Override + public ObjectMessage createObjectMessage(final Serializable object) throws JMSException { + return session.createObjectMessage(object); + } + + @Override + public void run() { + session.run(); + } + + @Override + public void recover() throws JMSException { + session.recover(); + } + + @Override + public void commit() throws JMSException { + session.commit(); + } + + @Override + public int getAcknowledgeMode() throws JMSException { + return session.getAcknowledgeMode(); + } + + @Override + public TextMessage createTextMessage(final String text) throws JMSException { + return session.createTextMessage(text); + } + + @Override + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name, final String messageSelector, final boolean noLocal) throws JMSException { + return session.createDurableSubscriber(topic, name, messageSelector, noLocal); + } + + @Override + public ObjectMessage createObjectMessage() throws JMSException { + return session.createObjectMessage(); + } + + @Override + public Topic createTopic(final String topicName) throws JMSException { + return session.createTopic(topicName); + } + + @Override + public void setMessageListener(final MessageListener listener) throws JMSException { + session.setMessageListener(listener); + } + + @Override + public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException { + return session.createBrowser(queue, messageSelector); + } + + @Override + public MessageProducer createProducer(final Destination destination) throws JMSException { + return session.createProducer(destination); + } + + @Override + public Queue createQueue(final String queueName) throws JMSException { + return session.createQueue(queueName); + } + + @Override + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException { + return session.createDurableSubscriber(topic, name); + } + + @Override + public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException { + return session.createConsumer(destination, messageSelector); + } + + @Override + public StreamMessage createStreamMessage() throws JMSException { + return session.createStreamMessage(); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return session.getMessageListener(); + } + + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException { + return session.createTemporaryTopic(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final SessionWrapper that = (SessionWrapper) o; + + return connectionWrapper.equals(that.connectionWrapper) && session.equals(that.session); + + } + + @Override + public int hashCode() { + int result = connectionWrapper.hashCode(); + result = 31 * result + session.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/tomee/blob/43dd894b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java ---------------------------------------------------------------------- diff --git a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java index 2726530..1509541 100644 --- a/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java +++ b/examples/injection-of-connectionfactory/src/test/java/org/superbiz/injection/jms/MessagingBeanTest.java @@ -26,9 +26,10 @@ public class MessagingBeanTest extends TestCase { public void test() throws Exception { - final Context context = EJBContainer.createEJBContainer().getContext(); + final EJBContainer ejbContainer = EJBContainer.createEJBContainer(); + final Context context = ejbContainer.getContext(); - Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages"); + final Messages messages = (Messages) context.lookup("java:global/injection-of-connectionfactory/Messages"); messages.sendMessage("Hello World!"); messages.sendMessage("How are you?"); @@ -37,6 +38,9 @@ public class MessagingBeanTest extends TestCase { assertEquals(messages.receiveMessage(), "Hello World!"); assertEquals(messages.receiveMessage(), "How are you?"); assertEquals(messages.receiveMessage(), "Still spinning?"); + + context.close(); + ejbContainer.close(); } } //END SNIPPET: code
