Repository: tomee
Updated Branches:
  refs/heads/master dd28c89b2 -> 456a16d56


better cleaning of our thread after execution


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/456a16d5
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/456a16d5
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/456a16d5

Branch: refs/heads/master
Commit: 456a16d5610e79d3fb2ce57ec2362949bee41378
Parents: dd28c89
Author: Romain Manni-Bucau <[email protected]>
Authored: Wed Nov 18 15:30:01 2015 -0800
Committer: Romain Manni-Bucau <[email protected]>
Committed: Wed Nov 18 15:30:01 2015 -0800

----------------------------------------------------------------------
 .../openejb/assembler/classic/Assembler.java    |  14 ++
 .../core/security/AbstractSecurityService.java  |   8 +-
 .../core/stateless/StatelessContainer.java      |   8 +-
 .../stateless/StatelessInstanceManager.java     |  26 +++
 .../GeronimoTransactionManagerFactory.java      |  32 ++-
 .../openejb/testing/ApplicationComposers.java   |  11 +-
 .../activemq/ProperConnectionShutdownTest.java  | 227 +++++++++++++++++++
 7 files changed, 321 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
index 69149ef..7064052 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/assembler/classic/Assembler.java
@@ -1808,6 +1808,20 @@ public class Assembler extends AssemblerTool implements 
org.apache.openejb.spi.A
             systemInstance.removeComponent(EjbResolver.class);
             systemInstance.fireEvent(new AssemblerDestroyed());
             systemInstance.removeObservers();
+
+            if (DestroyableResource.class.isInstance(this.securityService)) {
+                
DestroyableResource.class.cast(this.securityService).destroyResource();
+            }
+            if (DestroyableResource.class.isInstance(this.transactionManager)) 
{
+                
DestroyableResource.class.cast(this.transactionManager).destroyResource();
+            }
+
+            for (final Container c : this.containerSystem.containers()) {
+                if (DestroyableResource.class.isInstance(c)) { // TODO: should 
we use auto closeable there?
+                    DestroyableResource.class.cast(c).destroyResource();
+                }
+            }
+
             SystemInstance.reset();
         } finally {
             l.unlock();

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java
index c894adc..2c1a798 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/security/AbstractSecurityService.java
@@ -19,6 +19,7 @@ package org.apache.openejb.core.security;
 
 import org.apache.openejb.BeanContext;
 import org.apache.openejb.InterfaceType;
+import org.apache.openejb.api.resource.DestroyableResource;
 import org.apache.openejb.core.ThreadContext;
 import org.apache.openejb.core.ThreadContextListener;
 import org.apache.openejb.core.security.jaas.GroupPrincipal;
@@ -58,7 +59,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * to clients, is mostly secure, and can be deserialized in a client vm without
  * addition openejb-core classes.
  */
-public abstract class AbstractSecurityService implements 
SecurityService<UUID>, ThreadContextListener, 
BasicPolicyConfiguration.RoleResolver {
+public abstract class AbstractSecurityService implements DestroyableResource, 
SecurityService<UUID>, ThreadContextListener, 
BasicPolicyConfiguration.RoleResolver {
 
     private static final Map<Object, Identity> identities = new 
ConcurrentHashMap<Object, Identity>();
     protected static final ThreadLocal<Identity> clientIdentity = new 
ThreadLocal<Identity>();
@@ -84,6 +85,11 @@ public abstract class AbstractSecurityService implements 
SecurityService<UUID>,
         
SystemInstance.get().setComponent(BasicPolicyConfiguration.RoleResolver.class, 
this);
     }
 
+    @Override
+    public void destroyResource() {
+        // no-op
+    }
+
     public String getRealmName() {
         return realmName;
     }

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
index 62bee31..11f43e0 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
@@ -23,6 +23,7 @@ import org.apache.openejb.InterfaceType;
 import org.apache.openejb.OpenEJBException;
 import org.apache.openejb.ProxyInfo;
 import org.apache.openejb.SystemException;
+import org.apache.openejb.api.resource.DestroyableResource;
 import org.apache.openejb.cdi.CurrentCreationalContext;
 import org.apache.openejb.core.ExceptionType;
 import org.apache.openejb.core.Operation;
@@ -59,7 +60,7 @@ import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste
 /**
  * @org.apache.xbean.XBean element="statelessContainer"
  */
-public class StatelessContainer implements org.apache.openejb.RpcContainer {
+public class StatelessContainer implements org.apache.openejb.RpcContainer, 
DestroyableResource {
 
     private final ConcurrentMap<Class<?>, List<Method>> interceptorCache = new 
ConcurrentHashMap<Class<?>, List<Method>>();
     private final StatelessInstanceManager instanceManager;
@@ -325,4 +326,9 @@ public class StatelessContainer implements 
org.apache.openejb.RpcContainer {
         }
         return annotated;
     }
+
+    @Override
+    public void destroyResource() {
+        this.instanceManager.destroy();
+    }
 }

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
index aa6617c..c8986f7 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
@@ -70,6 +70,9 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 public class StatelessInstanceManager {
     private static final Logger logger = 
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
@@ -168,6 +171,29 @@ public class StatelessInstanceManager {
         }
     }
 
+    public void destroy() {
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(10000, MILLISECONDS)) {
+                    
java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING,
 getClass().getSimpleName() + " pool  timeout expired");
+                }
+            } catch (final InterruptedException e) {
+                Thread.interrupted();
+            }
+        }
+        if (scheduledExecutor != null) {
+            scheduledExecutor.shutdown();
+            try {
+                if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) {
+                    
java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING,
 getClass().getSimpleName() + " pool  timeout expired");
+                }
+            } catch (final InterruptedException e) {
+                Thread.interrupted();
+            }
+        }
+    }
+
     /**
      * Removes an instance from the pool and returns it for use
      * by the container in business methods.

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java
index 9681b92..5facdd4 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/GeronimoTransactionManagerFactory.java
@@ -19,8 +19,10 @@
 package org.apache.openejb.resource;
 
 import org.apache.geronimo.transaction.log.HOWLLog;
+import 
org.apache.geronimo.transaction.manager.ExponentialtIntervalRetryScheduler;
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.geronimo.transaction.manager.TransactionLog;
+import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
 import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
 import org.apache.geronimo.transaction.manager.XidFactory;
 import org.apache.geronimo.transaction.manager.XidFactoryImpl;
@@ -29,12 +31,16 @@ import org.apache.openejb.api.jmx.Description;
 import org.apache.openejb.api.jmx.MBean;
 import org.apache.openejb.api.jmx.ManagedAttribute;
 import org.apache.openejb.api.jmx.ManagedOperation;
+import org.apache.openejb.api.resource.DestroyableResource;
 import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.monitoring.LocalMBeanServer;
 import org.apache.openejb.monitoring.ObjectNameBuilder;
 import org.apache.openejb.util.Duration;
 
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
+import java.lang.reflect.Field;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -102,7 +108,7 @@ public class GeronimoTransactionManagerFactory {
             ((HOWLLog) txLog).doStart();
         }
 
-        final GeronimoTransactionManager geronimoTransactionManager = new 
GeronimoTransactionManager(defaultTransactionTimeoutSeconds, xidFactory, txLog);
+        final GeronimoTransactionManager geronimoTransactionManager = new 
DestroyableTransactionManager(defaultTransactionTimeoutSeconds, xidFactory, 
txLog);
         final ObjectNameBuilder jmxName = new 
ObjectNameBuilder("openejb.management")
             .set("j2eeType", "TransactionManager");
         LocalMBeanServer.registerDynamicWrapperSilently(
@@ -112,6 +118,30 @@ public class GeronimoTransactionManagerFactory {
         return geronimoTransactionManager;
     }
 
+    public static class DestroyableTransactionManager extends 
GeronimoTransactionManager implements DestroyableResource {
+        public DestroyableTransactionManager(final int 
defaultTransactionTimeoutSeconds, final XidFactory xidFactory, final 
TransactionLog transactionLog) throws XAException {
+            super(defaultTransactionTimeoutSeconds, xidFactory, 
transactionLog);
+        }
+
+        @Override
+        public void destroyResource() {
+            // try to clean up
+            try {
+                final Field f = 
TransactionManagerImpl.class.getDeclaredField("retryScheduler");
+                f.setAccessible(true);
+                final ExponentialtIntervalRetryScheduler rs = 
ExponentialtIntervalRetryScheduler.class.cast(f.get(this));
+
+                final Field t = 
ExponentialtIntervalRetryScheduler.class.getDeclaredField("timer");
+                t.setAccessible(true);
+
+                final Timer timer = Timer.class.cast(t.get(rs));
+                timer.cancel();
+            } catch (final Throwable notImportant) {
+                // no-op
+            }
+        }
+    }
+
     public static class GeronimoXAResourceWrapper implements XAResourceWrapper 
{
         public XAResource wrap(final XAResource xaResource, final String name) 
{
             return new WrapperNamedXAResource(xaResource, name);

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java
index 4b83058..ef07bf9 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/testing/ApplicationComposers.java
@@ -182,7 +182,10 @@ public class ApplicationComposers {
         testClassFinders.put(this, new ClassFinder(ancestors(klass))); // 
using this temporary since we don't have yet the instance
         if (additionalModules != null) {
             for (final Object o : additionalModules) {
-                testClassFinders.put(o, new 
ClassFinder(ancestors(o.getClass())));
+                final Class<?> aClass = o.getClass();
+                if (aClass != klass) {
+                    testClassFinders.put(o, new 
ClassFinder(ancestors(aClass)));
+                }
             }
         }
 
@@ -349,7 +352,11 @@ public class ApplicationComposers {
                 annotatedMethods = newAnnotatedMethods;
                 map.put(key, annotatedMethods);
             } else {
-                annotatedMethods.addAll(newAnnotatedMethods);
+                for (final Method m : newAnnotatedMethods) {
+                    if (!annotatedMethods.contains(m)) {
+                        annotatedMethods.add(m);
+                    }
+                }
             }
         }
         return map;

http://git-wip-us.apache.org/repos/asf/tomee/blob/456a16d5/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java
 
b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java
new file mode 100644
index 0000000..49d9a27
--- /dev/null
+++ 
b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ProperConnectionShutdownTest.java
@@ -0,0 +1,227 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.resource.activemq;
+
+import org.apache.openejb.jee.WebApp;
+import org.apache.openejb.junit.DeployApplication;
+import org.apache.openejb.testing.ApplicationComposers;
+import org.apache.openejb.testing.Classes;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.apache.openejb.util.Join;
+import org.apache.openejb.util.NetworkUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runners.model.Statement;
+
+import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.Stateless;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+// inspired from MessagingBeanTest in examples
+public class ProperConnectionShutdownTest {
+    @Test
+    @Ignore("https://issues.apache.org/jira/browse/AMQ-6051";)
+    public void run() throws Throwable {
+        final Thread[] threadsBefore = listThreads();
+        final AtomicReference<Thread[]> threadWhile = new AtomicReference<>();
+
+        // run test
+        final Statement testInContainer = new Statement() {
+            @Override
+            public void evaluate() throws Throwable {
+                messages.sendMessage("Hello World!");
+                messages.sendMessage("How are you?");
+
+                threadWhile.set(listThreads());
+
+                messages.sendMessage("Still spinning?");
+
+                assertEquals(messages.receiveMessage(), "Hello World!");
+                assertEquals(messages.receiveMessage(), "How are you?");
+                assertEquals(messages.receiveMessage(), "Still spinning?");
+
+                /* TODO: activate it when AMQ-6051 is fixed
+
+                // all worked, now hold a connection
+                new Thread(new Runnable() { // not daemon!
+                    @Override
+                    public void run() {
+                        messages.blockConnection(); // oops, I forgot to close 
it
+                    }
+                }).start();
+                 */
+            }
+        };
+        new DeployApplication(this, testInContainer, new 
ApplicationComposers(this)).evaluate();
+
+        Thread.sleep(2250); // AMQ state (started) polling for transport 
thread is 1s
+        while (Join.join("", listThreads()).contains("ActiveMQ Session Task")) 
{ // let few sec to AMQ to leave the holding task
+            Thread.sleep(1000);
+        }
+
+        // ensure no connection are leaking
+        final Thread[] threadsAfter = listThreads();
+
+        int countAMQ = 0;
+        int countOthers = 0;
+        for (final Thread t : threadsAfter) {
+            if (!t.isAlive()) {
+                continue;
+            }
+            if (t.getName().contains("AMQ") || 
t.getName().toLowerCase(Locale.ENGLISH).contains("activemq")) {
+                countAMQ++;
+            } else {
+                countOthers++;
+            }
+        }
+
+        final String debugMessage = Join.join(", ", threadsAfter);
+
+        assertEquals(debugMessage, 0, countAMQ);
+
+        // geronimo libs spawn 2 threads we know: PoolIdleReleaseTimer and 
CurrentTime so we can get initial + 2 threads there
+        assertTrue(debugMessage, countOthers <= threadsBefore.length + 2);
+    }
+
+    private Thread[] listThreads() {
+        final Thread[] threads = new Thread[Thread.activeCount()];
+        final int count = Thread.enumerate(threads);
+        if (count < threads.length) {
+            final Thread[] copy = new Thread[count];
+            System.arraycopy(threads, 0, copy, 0, count);
+            return copy;
+        }
+        return threads;
+    }
+
+    @EJB
+    private Messages messages;
+
+    @Configuration
+    public Properties config() {
+        return new PropertiesBuilder()
+            .p("Default JMS Resource Adapter.BrokerXmlConfig", 
"broker:(tcp://localhost:" + NetworkUtil.getNextAvailablePort() + 
")?useJmx=false")
+            .build();
+    }
+
+    @Module
+    @Classes(innerClassesAsBean = true)
+    public WebApp app() {
+        return new WebApp();
+    }
+
+    @Stateless
+    public static class Messages {
+
+        @Resource
+        private ConnectionFactory connectionFactory;
+
+        @Resource
+        private Queue chatQueue;
+
+        public void sendMessage(String text) throws JMSException {
+
+            Connection connection = null;
+            Session session = null;
+
+            try {
+                connection = connectionFactory.createConnection();
+                connection.start();
+
+                // Create a Session
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                // Create a MessageProducer from the Session to the Topic or 
Queue
+                MessageProducer producer = session.createProducer(chatQueue);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+                // Create a message
+                TextMessage message = session.createTextMessage(text);
+
+                // Tell the producer to send the message
+                producer.send(message);
+            } finally {
+                // Clean up
+                if (session != null) {
+                    session.close();
+                }
+                if (connection != null) {
+                    connection.close();
+                }
+            }
+        }
+
+        public String receiveMessage() throws JMSException {
+
+            Connection connection = null;
+            Session session = null;
+            MessageConsumer consumer = null;
+            try {
+                connection = connectionFactory.createConnection();
+                connection.start();
+
+                // Create a Session
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                // Create a MessageConsumer from the Session to the Topic or 
Queue
+                consumer = session.createConsumer(chatQueue);
+
+                // Wait for a message
+                TextMessage message = (TextMessage) consumer.receive(1000);
+
+                return message.getText();
+            } finally {
+                if (consumer != null) {
+                    consumer.close();
+                }
+                if (session != null) {
+                    session.close();
+                }
+                if (connection != null) {
+                    connection.close();
+                }
+            }
+
+        }
+
+        public void blockConnection() {
+            try {
+                connectionFactory.createConnection();
+            } catch (final JMSException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+}

Reply via email to