Author: andygumbrecht
Date: Mon Jul 21 13:11:53 2014
New Revision: 1612260

URL: http://svn.apache.org/r1612260
Log:
Fix StatelessInstanceManagerPoolingTest - ConcurrentAccessTimeoutException
Pool - Unhandled ex handler.
StatelessContainer - Synchronization

Modified:
    
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
    
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
    
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
    
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
    
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java

Modified: 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
--- 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
 (original)
+++ 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
 Mon Jul 21 13:11:53 2014
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
@@ -59,6 +60,7 @@ import static org.apache.openejb.core.tr
  */
 public class StatelessContainer implements org.apache.openejb.RpcContainer {
 
+    private final ReentrantLock lockRegistry = new ReentrantLock();
     private final ConcurrentMap<Class<?>, List<Method>> interceptorCache = new 
ConcurrentHashMap<Class<?>, List<Method>>();
     private final StatelessInstanceManager instanceManager;
     private final Map<String, BeanContext> deploymentRegistry = new 
HashMap<String, BeanContext>();
@@ -77,14 +79,30 @@ public class StatelessContainer implemen
     }
 
     @Override
-    public synchronized BeanContext[] getBeanContexts() {
-        return this.deploymentRegistry.values().toArray(new 
BeanContext[this.deploymentRegistry.size()]);
+    public BeanContext[] getBeanContexts() {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            return this.deploymentRegistry.values().toArray(new 
BeanContext[this.deploymentRegistry.size()]);
+        } finally {
+            l.unlock();
+        }
     }
 
     @Override
-    public synchronized BeanContext getBeanContext(final Object deploymentID) {
-        final String id = (String) deploymentID;
-        return deploymentRegistry.get(id);
+    public BeanContext getBeanContext(final Object deploymentID) {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            final String id = (String) deploymentID;
+            return deploymentRegistry.get(id);
+        } finally {
+            l.unlock();
+        }
     }
 
     @Override
@@ -99,10 +117,16 @@ public class StatelessContainer implemen
 
     @Override
     public void deploy(final BeanContext beanContext) throws OpenEJBException {
-        final String id = (String) beanContext.getDeploymentID();
-        synchronized (this) {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            final String id = (String) beanContext.getDeploymentID();
             deploymentRegistry.put(id, beanContext);
             beanContext.setContainer(this);
+        } finally {
+            l.unlock();
         }
 
         // add it before starting the timer (@PostCostruct)
@@ -131,11 +155,15 @@ public class StatelessContainer implemen
     public void undeploy(final BeanContext beanContext) {
         this.instanceManager.undeploy(beanContext);
 
-        synchronized (this) {
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+        try {
             final String id = (String) beanContext.getDeploymentID();
             beanContext.setContainer(null);
             beanContext.setContainerData(null);
             this.deploymentRegistry.remove(id);
+        } finally {
+            l.unlock();
         }
     }
 
@@ -163,6 +191,7 @@ public class StatelessContainer implemen
         final ThreadContext callContext = new ThreadContext(beanContext, 
primKey);
         final ThreadContext oldCallContext = ThreadContext.enter(callContext);
 
+        OpenEJBException oejbe = null;
         Instance bean = null;
         final CurrentCreationalContext currentCreationalContext = 
beanContext.get(CurrentCreationalContext.class);
 
@@ -196,22 +225,28 @@ public class StatelessContainer implemen
                 currentCreationalContext.set(bean.creationalContext);
             }
             return _invoke(callMethod, runMethod, args, bean, callContext, 
type);
+        } catch (final OpenEJBException t) {
+            oejbe = t;
+        }
 
-        } finally {
-
-            if (bean != null) {
-                if (callContext.isDiscardInstance()) {
-                    this.instanceManager.discardInstance(callContext, bean);
-                } else {
-                    this.instanceManager.poolInstance(callContext, bean);
-                }
+        if (bean != null) {
+            if (callContext.isDiscardInstance()) {
+                this.instanceManager.discardInstance(callContext, bean);
+            } else {
+                this.instanceManager.poolInstance(callContext, bean);
             }
+        }
 
-            ThreadContext.exit(oldCallContext);
+        ThreadContext.exit(oldCallContext);
 
-            if (currentCreationalContext != null) {
-                currentCreationalContext.remove();
-            }
+        if (currentCreationalContext != null) {
+            currentCreationalContext.remove();
+        }
+
+        if (null != oejbe) {
+            throw oejbe;
+        } else {
+            return null;
         }
     }
 

Modified: 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
--- 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 (original)
+++ 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 Mon Jul 21 13:11:53 2014
@@ -133,6 +133,7 @@ public class StatelessInstanceManager {
 
         @Override
         public void discard(final Instance instance, final Pool.Event reason) {
+
             final ThreadContext ctx = new ThreadContext(beanContext, null);
             final ThreadContext oldCallContext = ThreadContext.enter(ctx);
             try {
@@ -194,10 +195,11 @@ public class StatelessInstanceManager {
             throw new OpenEJBException("Unexpected Interruption of current 
thread: ", e);
         }
 
-        if (instance != null) {
-            return instance;
+        if (null == instance) {
+            instance = createInstance(callContext, beanContext);
         }
-        return createInstance(callContext, beanContext);
+
+        return instance;
     }
 
     private Instance createInstance(final ThreadContext callContext, final 
BeanContext beanContext) throws ApplicationException {
@@ -246,6 +248,7 @@ public class StatelessInstanceManager {
      * @throws OpenEJBException
      */
     public void poolInstance(final ThreadContext callContext, final Object 
bean) throws OpenEJBException {
+
         if (bean == null) {
             throw new SystemException("Invalid arguments");
         }
@@ -270,6 +273,7 @@ public class StatelessInstanceManager {
      * @param bean        Object
      */
     public void discardInstance(final ThreadContext callContext, final Object 
bean) throws SystemException {
+
         if (bean == null) {
             throw new SystemException("Invalid arguments");
         }
@@ -453,6 +457,7 @@ public class StatelessInstanceManager {
             if (!data.closePool()) {
                 logger.error("Timed-out waiting for stateless pool to close: 
for deployment '" + beanContext.getDeploymentID() + "'");
             }
+
         } catch (final InterruptedException e) {
             Thread.interrupted();
         }
@@ -502,12 +507,12 @@ public class StatelessInstanceManager {
     }
 
     private final class InstanceCreatorRunnable implements Runnable {
-        private long maxAge;
-        private long iteration;
-        private double maxAgeOffset;
-        private long min;
-        private Data data;
-        private StatelessSupplier supplier;
+        private final long maxAge;
+        private final long iteration;
+        private final double maxAgeOffset;
+        private final long min;
+        private final Data data;
+        private final StatelessSupplier supplier;
 
         private InstanceCreatorRunnable(final long maxAge, final long 
iteration, final long min, final double maxAgeOffset, final Data data, final 
StatelessSupplier supplier) {
             this.maxAge = maxAge;

Modified: 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
--- 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
 (original)
+++ 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
 Mon Jul 21 13:11:53 2014
@@ -31,7 +31,7 @@ public class DaemonThreadFactory impleme
 
     private final String name;
     private final ThreadGroup group;
-    private AtomicInteger ids = new AtomicInteger(0);
+    private final AtomicInteger ids = new AtomicInteger(0);
 
     public DaemonThreadFactory(final Object... name) {
         this.name = join(" ", name).trim();

Modified: 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
--- 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 (original)
+++ 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 Mon Jul 21 13:11:53 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
@@ -153,9 +154,30 @@ public class Pool<T> {
     }
 
     private Executor createExecutor() {
-        return new ThreadPoolExecutor(3, 10,
+        final ThreadPoolExecutor threadPoolExecutor = new 
ThreadPoolExecutor(3, 10,
             60L, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(2), new 
DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode()));
+
+        threadPoolExecutor.setRejectedExecutionHandler(new 
RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
+
+                if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
+                    return;
+                }
+
+                try {
+                    if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+                        
org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, 
"org.apache.openejb.util.resources")
+                            .warning("Default pool executor failed to run 
asynchronous process: " + r);
+                    }
+                } catch (final InterruptedException e) {
+                    //Ignore
+                }
+            }
+        });
+
+        return threadPoolExecutor;
     }
 
     private void greater(final String maxName, final long max, final String 
minName, final long min) {
@@ -420,6 +442,7 @@ public class Pool<T> {
     }
 
     public boolean close(final long timeout, final TimeUnit unit) throws 
InterruptedException {
+
         // drain all keys so no new instances will be accepted into the pool
         while (instances.tryAcquire()) {
             Thread.yield();

Modified: 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
--- 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
 (original)
+++ 
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
 Mon Jul 21 13:11:53 2014
@@ -34,7 +34,6 @@ import javax.ejb.Remote;
 import javax.ejb.Stateless;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -98,7 +97,6 @@ public class StatelessInstanceManagerPoo
         final int count = 50;
         final CountDownLatch invocations = new CountDownLatch(count);
         final InitialContext ctx = new InitialContext();
-        final ArrayList<Thread> threads = new ArrayList<Thread>(count);
 
         // 'count' instances should be created and discarded.
         for (int i = 0; i < count; i++) {
@@ -113,24 +111,32 @@ public class StatelessInstanceManagerPoo
                     }
                     final Counter counter = (Counter) object;
                     assertNotNull(counter);
-                    try {
-                        counter.explode();
-                    } catch (final Exception e) {
-                        invocations.countDown();
+
+                    boolean run = true;
+
+                    while (run) {
+                        try {
+                            counter.explode();
+                        } catch (final 
javax.ejb.ConcurrentAccessTimeoutException e) {
+                            //Try again in moment...
+                            try {
+                                Thread.sleep(10);
+                            } catch (final InterruptedException ie) {
+                                //Ignore
+                            }
+                        } catch (final Exception e) {
+                            invocations.countDown();
+                            run = false;
+                        }
                     }
                 }
             }, "test-thread-" + count);
 
             thread.setDaemon(false);
-            threads.add(thread);
             thread.start();
         }
 
-        for (final Thread thread : threads) {
-            thread.join();
-        }
-
-        final boolean success = invocations.await(160, TimeUnit.SECONDS);
+        final boolean success = invocations.await(20, TimeUnit.SECONDS);
 
         assertTrue("invocations timeout -> invocations.getCount() == " + 
invocations.getCount(), success);
 
@@ -265,13 +271,9 @@ public class StatelessInstanceManagerPoo
             return instances.get();
         }
 
-        public int discardCount() {
-            return discardedInstances.get();
-        }
-
         public void explode() {
-            discardedInstances.incrementAndGet();
-            throw new NullPointerException("Test expected this null pointer");
+            final int i = discardedInstances.incrementAndGet();
+            throw new NullPointerException("Test expected this null pointer: " 
+ i);
         }
 
         public void race(final CountDownLatch ready, final CountDownLatch go) {


Reply via email to