Author: andygumbrecht
Date: Mon Jul 21 13:12:59 2014
New Revision: 1612261

URL: http://svn.apache.org/r1612261
Log:
Fix StatelessInstanceManagerPoolingTest - ConcurrentAccessTimeoutException
Pool - Unhandled ex handler.
StatelessContainer - Synchronization

Modified:
    
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
    
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
    
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
    
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
    
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java

Modified: 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
--- 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
 (original)
+++ 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
 Mon Jul 21 13:12:59 2014
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
@@ -59,6 +60,7 @@ import static org.apache.openejb.core.tr
  */
 public class StatelessContainer implements org.apache.openejb.RpcContainer {
 
+    private final ReentrantLock lockRegistry = new ReentrantLock();
     private final ConcurrentMap<Class<?>, List<Method>> interceptorCache = new 
ConcurrentHashMap<Class<?>, List<Method>>();
     private final StatelessInstanceManager instanceManager;
     private final Map<String, BeanContext> deploymentRegistry = new 
HashMap<String, BeanContext>();
@@ -77,14 +79,30 @@ public class StatelessContainer implemen
     }
 
     @Override
-    public synchronized BeanContext[] getBeanContexts() {
-        return this.deploymentRegistry.values().toArray(new 
BeanContext[this.deploymentRegistry.size()]);
+    public BeanContext[] getBeanContexts() {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            return this.deploymentRegistry.values().toArray(new 
BeanContext[this.deploymentRegistry.size()]);
+        } finally {
+            l.unlock();
+        }
     }
 
     @Override
-    public synchronized BeanContext getBeanContext(final Object deploymentID) {
-        final String id = (String) deploymentID;
-        return deploymentRegistry.get(id);
+    public BeanContext getBeanContext(final Object deploymentID) {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            final String id = (String) deploymentID;
+            return deploymentRegistry.get(id);
+        } finally {
+            l.unlock();
+        }
     }
 
     @Override
@@ -99,10 +117,16 @@ public class StatelessContainer implemen
 
     @Override
     public void deploy(final BeanContext beanContext) throws OpenEJBException {
-        final String id = (String) beanContext.getDeploymentID();
-        synchronized (this) {
+
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+
+        try {
+            final String id = (String) beanContext.getDeploymentID();
             deploymentRegistry.put(id, beanContext);
             beanContext.setContainer(this);
+        } finally {
+            l.unlock();
         }
 
         // add it before starting the timer (@PostCostruct)
@@ -131,11 +155,15 @@ public class StatelessContainer implemen
     public void undeploy(final BeanContext beanContext) {
         this.instanceManager.undeploy(beanContext);
 
-        synchronized (this) {
+        final ReentrantLock l = lockRegistry;
+        l.lock();
+        try {
             final String id = (String) beanContext.getDeploymentID();
             beanContext.setContainer(null);
             beanContext.setContainerData(null);
             this.deploymentRegistry.remove(id);
+        } finally {
+            l.unlock();
         }
     }
 
@@ -163,6 +191,7 @@ public class StatelessContainer implemen
         final ThreadContext callContext = new ThreadContext(beanContext, 
primKey);
         final ThreadContext oldCallContext = ThreadContext.enter(callContext);
 
+        OpenEJBException oejbe = null;
         Instance bean = null;
         final CurrentCreationalContext currentCreationalContext = 
beanContext.get(CurrentCreationalContext.class);
 
@@ -196,22 +225,28 @@ public class StatelessContainer implemen
                 currentCreationalContext.set(bean.creationalContext);
             }
             return _invoke(callMethod, runMethod, args, bean, callContext, 
type);
+        } catch (final OpenEJBException t) {
+            oejbe = t;
+        }
 
-        } finally {
-
-            if (bean != null) {
-                if (callContext.isDiscardInstance()) {
-                    this.instanceManager.discardInstance(callContext, bean);
-                } else {
-                    this.instanceManager.poolInstance(callContext, bean);
-                }
+        if (bean != null) {
+            if (callContext.isDiscardInstance()) {
+                this.instanceManager.discardInstance(callContext, bean);
+            } else {
+                this.instanceManager.poolInstance(callContext, bean);
             }
+        }
 
-            ThreadContext.exit(oldCallContext);
+        ThreadContext.exit(oldCallContext);
 
-            if (currentCreationalContext != null) {
-                currentCreationalContext.remove();
-            }
+        if (currentCreationalContext != null) {
+            currentCreationalContext.remove();
+        }
+
+        if (null != oejbe) {
+            throw oejbe;
+        } else {
+            return null;
         }
     }
 

Modified: 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
--- 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 (original)
+++ 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
 Mon Jul 21 13:12:59 2014
@@ -133,6 +133,7 @@ public class StatelessInstanceManager {
 
         @Override
         public void discard(final Instance instance, final Pool.Event reason) {
+
             final ThreadContext ctx = new ThreadContext(beanContext, null);
             final ThreadContext oldCallContext = ThreadContext.enter(ctx);
             try {
@@ -194,11 +195,12 @@ public class StatelessInstanceManager {
             throw new OpenEJBException("Unexpected Interruption of current 
thread: ", e);
         }
 
-        if (instance != null) {
+        if (null == instance) {
+            instance = createInstance(callContext, beanContext);
+        }
+
             return instance;
         }
-        return createInstance(callContext, beanContext);
-    }
 
     private Instance createInstance(final ThreadContext callContext, final 
BeanContext beanContext) throws ApplicationException {
         try {
@@ -246,6 +248,7 @@ public class StatelessInstanceManager {
      * @throws OpenEJBException
      */
     public void poolInstance(final ThreadContext callContext, final Object 
bean) throws OpenEJBException {
+
         if (bean == null) {
             throw new SystemException("Invalid arguments");
         }
@@ -270,6 +273,7 @@ public class StatelessInstanceManager {
      * @param bean        Object
      */
     public void discardInstance(final ThreadContext callContext, final Object 
bean) throws SystemException {
+
         if (bean == null) {
             throw new SystemException("Invalid arguments");
         }
@@ -453,6 +457,7 @@ public class StatelessInstanceManager {
             if (!data.closePool()) {
                 logger.error("Timed-out waiting for stateless pool to close: 
for deployment '" + beanContext.getDeploymentID() + "'");
             }
+
         } catch (final InterruptedException e) {
             Thread.interrupted();
         }

Modified: 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
--- 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 (original)
+++ 
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 Mon Jul 21 13:12:59 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
@@ -153,9 +154,30 @@ public class Pool<T> {
     }
 
     private Executor createExecutor() {
-        return new ThreadPoolExecutor(3, 10,
+        final ThreadPoolExecutor threadPoolExecutor = new 
ThreadPoolExecutor(3, 10,
             60L, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(2), new 
DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode()));
+
+        threadPoolExecutor.setRejectedExecutionHandler(new 
RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
+
+                if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
+                    return;
+    }
+
+                try {
+                    if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+                        
org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, 
"org.apache.openejb.util.resources")
+                            .warning("Default pool executor failed to run 
asynchronous process: " + r);
+                    }
+                } catch (final InterruptedException e) {
+                    //Ignore
+                }
+            }
+        });
+
+        return threadPoolExecutor;
     }
 
     private void greater(final String maxName, final long max, final String 
minName, final long min) {
@@ -420,6 +442,7 @@ public class Pool<T> {
     }
 
     public boolean close(final long timeout, final TimeUnit unit) throws 
InterruptedException {
+
         // drain all keys so no new instances will be accepted into the pool
         while (instances.tryAcquire()) {
             Thread.yield();

Modified: 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
--- 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
 (original)
+++ 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
 Mon Jul 21 13:12:59 2014
@@ -34,6 +34,8 @@ import javax.ejb.SessionContext;
 import javax.ejb.Singleton;
 import javax.ejb.Startup;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -71,27 +73,27 @@ public class AsyncPostContructTest {
         private SessionContext sc;
 
         private Future<Boolean> future;
-        private long startEnd;
-        private long asyncStart;
-        private Object startInstance;
-        private Object asyncInstance;
+        private final AtomicLong startEnd = new AtomicLong();
+        private final AtomicLong asyncStart = new AtomicLong();
+        private final AtomicReference<Object> startInstance = new 
AtomicReference<Object>();
+        private final AtomicReference<Object> asyncInstance = new 
AtomicReference<Object>();
 
         @PostConstruct
         public void start() {
-            startInstance = this;
+            startInstance.set(this);
             future = sc.getBusinessObject(BuildMeAsync.class).async();
             try {
                 Thread.sleep(100);
             } catch (final InterruptedException e) {
                 // no-op
             }
-            startEnd = System.nanoTime();
+            startEnd.set(System.nanoTime());
         }
 
         @Asynchronous
         public Future<Boolean> async() {
-            asyncStart = System.nanoTime();
-            asyncInstance = this;
+            asyncStart.set(System.nanoTime());
+            asyncInstance.set(this);
             return new AsyncResult<Boolean>(true);
         }
 
@@ -104,19 +106,19 @@ public class AsyncPostContructTest {
         }
 
         public long getStartEnd() {
-            return startEnd;
+            return startEnd.get();
         }
 
         public long getAsyncStart() {
-            return asyncStart;
+            return asyncStart.get();
         }
 
         public Object getStartInstance() {
-            return startInstance;
+            return startInstance.get();
         }
 
         public Object getAsyncInstance() {
-            return asyncInstance;
+            return asyncInstance.get();
         }
     }
 }

Modified: 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
--- 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
 (original)
+++ 
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
 Mon Jul 21 13:12:59 2014
@@ -34,7 +34,6 @@ import javax.ejb.Remote;
 import javax.ejb.Stateless;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -101,8 +100,6 @@ public class StatelessInstanceManagerPoo
         final int count = 30;
         final CountDownLatch invocations = new CountDownLatch(count);
         final InitialContext ctx = new InitialContext();
-        final Runnable counterBeanLocal = new Runnable() {
-            public void run() {
 
                 Object object = null;
                 try {
@@ -123,14 +120,46 @@ public class StatelessInstanceManagerPoo
         // 'count' instances should be created and discarded.
         final Collection<Thread> th = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
-            final Thread thread = new Thread(counterBeanLocal);
-            th.add(thread);
+            final Thread thread = new Thread(new Runnable() {
+                public void run() {
+
+                    Object object = null;
+                    try {
+                        object = ctx.lookup("CounterBeanLocal");
+                    } catch (final NamingException e) {
+                        assertTrue(false);
+                    }
+                    final Counter counter = (Counter) object;
+                    assertNotNull(counter);
+
+                    boolean run = true;
+
+                    while (run) {
+                        try {
+                            counter.explode();
+                        } catch (final 
javax.ejb.ConcurrentAccessTimeoutException e) {
+                            //Try again in moment...
+                            try {
+                                Thread.sleep(10);
+                            } catch (final InterruptedException ie) {
+                                //Ignore
+                            }
+                        } catch (final Exception e) {
+                            invocations.countDown();
+                            run = false;
+                        }
+                    }
+                }
+            }, "test-thread-" + count);
+
+            thread.setDaemon(false);
             thread.start();
         }
 
-        for (final Thread t : th) {
-            t.join();
-        }
+        final boolean success = invocations.await(20, TimeUnit.SECONDS);
+
+        assertTrue("invocations timeout -> invocations.getCount() == " + 
invocations.getCount(), success);
+
         assertEquals(count, discardedInstances.get());
 
     }
@@ -263,17 +292,9 @@ public class StatelessInstanceManagerPoo
             return instances.get();
         }
 
-        public int discardCount() {
-            return discardedInstances.get();
-        }
-
         public void explode(final CountDownLatch latch) {
-            discardedInstances.incrementAndGet();
-            try {
-                throw new NullPointerException("Test expected this null 
pointer: " + latch.getCount());
-            } finally {
-                latch.countDown();
-            }
+            final int i = discardedInstances.incrementAndGet();
+            throw new NullPointerException("Test expected this null pointer: " 
+ i);
         }
 
         public void race(final CountDownLatch ready, final CountDownLatch go) {


Reply via email to