Repository: tomee Updated Branches: refs/heads/master dd28c89b2 -> 456a16d56
better cleaning of our thread after execution Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/456a16d5 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/456a16d5 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/456a16d5 Branch: refs/heads/master Commit: 456a16d5610e79d3fb2ce57ec2362949bee41378 Parents: dd28c89 Author: Romain Manni-Bucau <[email protected]> Authored: Wed Nov 18 15:30:01 2015 -0800 Committer: Romain Manni-Bucau <[email protected]> Committed: Wed Nov 18 15:30:01 2015 -0800 ---------------------------------------------------------------------- .../openejb/assembler/classic/Assembler.java | 14 ++ .../core/security/AbstractSecurityService.java | 8 +- .../core/stateless/StatelessContainer.java | 8 +- .../stateless/StatelessInstanceManager.java | 26 +++ .../GeronimoTransactionManagerFactory.java | 32 ++- .../openejb/testing/ApplicationComposers.java | 11 +- .../activemq/ProperConnectionShutdownTest.java | 227 +++++++++++++++++++ 7 files changed, 321 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java b/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java index 69149ef..7064052 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java @@ -1808,6 +1808,20 @@ public class Assembler extends AssemblerTool implements org.apache.openejb.spi.A systemInstance.removeComponent(EjbResolver.class); systemInstance.fireEvent(new AssemblerDestroyed()); systemInstance.removeObservers(); + + if (DestroyableResource.class.isInstance(this.securityService)) { + DestroyableResource.class.cast(this.securityService).destroyResource(); + } + if (DestroyableResource.class.isInstance(this.transactionManager)) { + DestroyableResource.class.cast(this.transactionManager).destroyResource(); + } + + for (final Container c : this.containerSystem.containers()) { + if (DestroyableResource.class.isInstance(c)) { // TODO: should we use auto closeable there? + DestroyableResource.class.cast(c).destroyResource(); + } + } + SystemInstance.reset(); } finally { l.unlock(); http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java b/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java index c894adc..2c1a798 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java @@ -19,6 +19,7 @@ package org.apache.openejb.core.security; import org.apache.openejb.BeanContext; import org.apache.openejb.InterfaceType; +import org.apache.openejb.api.resource.DestroyableResource; import org.apache.openejb.core.ThreadContext; import org.apache.openejb.core.ThreadContextListener; import org.apache.openejb.core.security.jaas.GroupPrincipal; @@ -58,7 +59,7 @@ import java.util.concurrent.ConcurrentHashMap; * to clients, is mostly secure, and can be deserialized in a client vm without * addition openejb-core classes. */ -public abstract class AbstractSecurityService implements SecurityService<UUID>, ThreadContextListener, BasicPolicyConfiguration.RoleResolver { +public abstract class AbstractSecurityService implements DestroyableResource, SecurityService<UUID>, ThreadContextListener, BasicPolicyConfiguration.RoleResolver { private static final Map<Object, Identity> identities = new ConcurrentHashMap<Object, Identity>(); protected static final ThreadLocal<Identity> clientIdentity = new ThreadLocal<Identity>(); @@ -84,6 +85,11 @@ public abstract class AbstractSecurityService implements SecurityService<UUID>, SystemInstance.get().setComponent(BasicPolicyConfiguration.RoleResolver.class, this); } + @Override + public void destroyResource() { + // no-op + } + public String getRealmName() { return realmName; } http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java index 62bee31..11f43e0 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java @@ -23,6 +23,7 @@ import org.apache.openejb.InterfaceType; import org.apache.openejb.OpenEJBException; import org.apache.openejb.ProxyInfo; import org.apache.openejb.SystemException; +import org.apache.openejb.api.resource.DestroyableResource; import org.apache.openejb.cdi.CurrentCreationalContext; import org.apache.openejb.core.ExceptionType; import org.apache.openejb.core.Operation; @@ -59,7 +60,7 @@ import static org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste /** * @org.apache.xbean.XBean element="statelessContainer" */ -public class StatelessContainer implements org.apache.openejb.RpcContainer { +public class StatelessContainer implements org.apache.openejb.RpcContainer, DestroyableResource { private final ConcurrentMap<Class<?>, List<Method>> interceptorCache = new ConcurrentHashMap<Class<?>, List<Method>>(); private final StatelessInstanceManager instanceManager; @@ -325,4 +326,9 @@ public class StatelessContainer implements org.apache.openejb.RpcContainer { } return annotated; } + + @Override + public void destroyResource() { + this.instanceManager.destroy(); + } } http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java index aa6617c..c8986f7 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java @@ -70,6 +70,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class StatelessInstanceManager { private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); @@ -168,6 +171,29 @@ public class StatelessInstanceManager { } } + public void destroy() { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.interrupted(); + } + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + try { + if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.interrupted(); + } + } + } + /** * Removes an instance from the pool and returns it for use * by the container in business methods. http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java index 9681b92..5facdd4 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java @@ -19,8 +19,10 @@ package org.apache.openejb.resource; import org.apache.geronimo.transaction.log.HOWLLog; +import org.apache.geronimo.transaction.manager.ExponentialtIntervalRetryScheduler; import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.apache.geronimo.transaction.manager.TransactionLog; +import org.apache.geronimo.transaction.manager.TransactionManagerImpl; import org.apache.geronimo.transaction.manager.WrapperNamedXAResource; import org.apache.geronimo.transaction.manager.XidFactory; import org.apache.geronimo.transaction.manager.XidFactoryImpl; @@ -29,12 +31,16 @@ import org.apache.openejb.api.jmx.Description; import org.apache.openejb.api.jmx.MBean; import org.apache.openejb.api.jmx.ManagedAttribute; import org.apache.openejb.api.jmx.ManagedOperation; +import org.apache.openejb.api.resource.DestroyableResource; import org.apache.openejb.loader.SystemInstance; import org.apache.openejb.monitoring.LocalMBeanServer; import org.apache.openejb.monitoring.ObjectNameBuilder; import org.apache.openejb.util.Duration; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import java.lang.reflect.Field; +import java.util.Timer; import java.util.concurrent.TimeUnit; /** @@ -102,7 +108,7 @@ public class GeronimoTransactionManagerFactory { ((HOWLLog) txLog).doStart(); } - final GeronimoTransactionManager geronimoTransactionManager = new GeronimoTransactionManager(defaultTransactionTimeoutSeconds, xidFactory, txLog); + final GeronimoTransactionManager geronimoTransactionManager = new DestroyableTransactionManager(defaultTransactionTimeoutSeconds, xidFactory, txLog); final ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb.management") .set("j2eeType", "TransactionManager"); LocalMBeanServer.registerDynamicWrapperSilently( @@ -112,6 +118,30 @@ public class GeronimoTransactionManagerFactory { return geronimoTransactionManager; } + public static class DestroyableTransactionManager extends GeronimoTransactionManager implements DestroyableResource { + public DestroyableTransactionManager(final int defaultTransactionTimeoutSeconds, final XidFactory xidFactory, final TransactionLog transactionLog) throws XAException { + super(defaultTransactionTimeoutSeconds, xidFactory, transactionLog); + } + + @Override + public void destroyResource() { + // try to clean up + try { + final Field f = TransactionManagerImpl.class.getDeclaredField("retryScheduler"); + f.setAccessible(true); + final ExponentialtIntervalRetryScheduler rs = ExponentialtIntervalRetryScheduler.class.cast(f.get(this)); + + final Field t = ExponentialtIntervalRetryScheduler.class.getDeclaredField("timer"); + t.setAccessible(true); + + final Timer timer = Timer.class.cast(t.get(rs)); + timer.cancel(); + } catch (final Throwable notImportant) { + // no-op + } + } + } + public static class GeronimoXAResourceWrapper implements XAResourceWrapper { public XAResource wrap(final XAResource xaResource, final String name) { return new WrapperNamedXAResource(xaResource, name); http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java b/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java index 4b83058..ef07bf9 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java @@ -182,7 +182,10 @@ public class ApplicationComposers { testClassFinders.put(this, new ClassFinder(ancestors(klass))); // using this temporary since we don't have yet the instance if (additionalModules != null) { for (final Object o : additionalModules) { - testClassFinders.put(o, new ClassFinder(ancestors(o.getClass()))); + final Class<?> aClass = o.getClass(); + if (aClass != klass) { + testClassFinders.put(o, new ClassFinder(ancestors(aClass))); + } } } @@ -349,7 +352,11 @@ public class ApplicationComposers { annotatedMethods = newAnnotatedMethods; map.put(key, annotatedMethods); } else { - annotatedMethods.addAll(newAnnotatedMethods); + for (final Method m : newAnnotatedMethods) { + if (!annotatedMethods.contains(m)) { + annotatedMethods.add(m); + } + } } } return map; http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java new file mode 100644 index 0000000..49d9a27 --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java @@ -0,0 +1,227 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.resource.activemq; + +import org.apache.openejb.jee.WebApp; +import org.apache.openejb.junit.DeployApplication; +import org.apache.openejb.testing.ApplicationComposers; +import org.apache.openejb.testing.Classes; +import org.apache.openejb.testing.Configuration; +import org.apache.openejb.testing.Module; +import org.apache.openejb.testng.PropertiesBuilder; +import org.apache.openejb.util.Join; +import org.apache.openejb.util.NetworkUtil; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runners.model.Statement; + +import javax.annotation.Resource; +import javax.ejb.EJB; +import javax.ejb.Stateless; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +// inspired from MessagingBeanTest in examples +public class ProperConnectionShutdownTest { + @Test + @Ignore("https://issues.apache.org/jira/browse/AMQ-6051") + public void run() throws Throwable { + final Thread[] threadsBefore = listThreads(); + final AtomicReference<Thread[]> threadWhile = new AtomicReference<>(); + + // run test + final Statement testInContainer = new Statement() { + @Override + public void evaluate() throws Throwable { + messages.sendMessage("Hello World!"); + messages.sendMessage("How are you?"); + + threadWhile.set(listThreads()); + + messages.sendMessage("Still spinning?"); + + assertEquals(messages.receiveMessage(), "Hello World!"); + assertEquals(messages.receiveMessage(), "How are you?"); + assertEquals(messages.receiveMessage(), "Still spinning?"); + + /* TODO: activate it when AMQ-6051 is fixed + + // all worked, now hold a connection + new Thread(new Runnable() { // not daemon! + @Override + public void run() { + messages.blockConnection(); // oops, I forgot to close it + } + }).start(); + */ + } + }; + new DeployApplication(this, testInContainer, new ApplicationComposers(this)).evaluate(); + + Thread.sleep(2250); // AMQ state (started) polling for transport thread is 1s + while (Join.join("", listThreads()).contains("ActiveMQ Session Task")) { // let few sec to AMQ to leave the holding task + Thread.sleep(1000); + } + + // ensure no connection are leaking + final Thread[] threadsAfter = listThreads(); + + int countAMQ = 0; + int countOthers = 0; + for (final Thread t : threadsAfter) { + if (!t.isAlive()) { + continue; + } + if (t.getName().contains("AMQ") || t.getName().toLowerCase(Locale.ENGLISH).contains("activemq")) { + countAMQ++; + } else { + countOthers++; + } + } + + final String debugMessage = Join.join(", ", threadsAfter); + + assertEquals(debugMessage, 0, countAMQ); + + // geronimo libs spawn 2 threads we know: PoolIdleReleaseTimer and CurrentTime so we can get initial + 2 threads there + assertTrue(debugMessage, countOthers <= threadsBefore.length + 2); + } + + private Thread[] listThreads() { + final Thread[] threads = new Thread[Thread.activeCount()]; + final int count = Thread.enumerate(threads); + if (count < threads.length) { + final Thread[] copy = new Thread[count]; + System.arraycopy(threads, 0, copy, 0, count); + return copy; + } + return threads; + } + + @EJB + private Messages messages; + + @Configuration + public Properties config() { + return new PropertiesBuilder() + .p("Default JMS Resource Adapter.BrokerXmlConfig", "broker:(tcp://localhost:" + NetworkUtil.getNextAvailablePort() + ")?useJmx=false") + .build(); + } + + @Module + @Classes(innerClassesAsBean = true) + public WebApp app() { + return new WebApp(); + } + + @Stateless + public static class Messages { + + @Resource + private ConnectionFactory connectionFactory; + + @Resource + private Queue chatQueue; + + public void sendMessage(String text) throws JMSException { + + Connection connection = null; + Session session = null; + + try { + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageProducer from the Session to the Topic or Queue + MessageProducer producer = session.createProducer(chatQueue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Create a message + TextMessage message = session.createTextMessage(text); + + // Tell the producer to send the message + producer.send(message); + } finally { + // Clean up + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } + + public String receiveMessage() throws JMSException { + + Connection connection = null; + Session session = null; + MessageConsumer consumer = null; + try { + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(chatQueue); + + // Wait for a message + TextMessage message = (TextMessage) consumer.receive(1000); + + return message.getText(); + } finally { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + + } + + public void blockConnection() { + try { + connectionFactory.createConnection(); + } catch (final JMSException e) { + throw new IllegalStateException(e); + } + } + } +}
