Author: andygumbrecht
Date: Mon Jul 21 13:12:59 2014
New Revision: 1612261
URL: http://svn.apache.org/r1612261
Log:
Fix StatelessInstanceManagerPoolingTest - ConcurrentAccessTimeoutException
Pool - Unhandled ex handler.
StatelessContainer - Synchronization
Modified:
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
Modified:
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
---
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
(original)
+++
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
Mon Jul 21 13:12:59 2014
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
@@ -59,6 +60,7 @@ import static org.apache.openejb.core.tr
*/
public class StatelessContainer implements org.apache.openejb.RpcContainer {
+ private final ReentrantLock lockRegistry = new ReentrantLock();
private final ConcurrentMap<Class<?>, List<Method>> interceptorCache = new
ConcurrentHashMap<Class<?>, List<Method>>();
private final StatelessInstanceManager instanceManager;
private final Map<String, BeanContext> deploymentRegistry = new
HashMap<String, BeanContext>();
@@ -77,14 +79,30 @@ public class StatelessContainer implemen
}
@Override
- public synchronized BeanContext[] getBeanContexts() {
- return this.deploymentRegistry.values().toArray(new
BeanContext[this.deploymentRegistry.size()]);
+ public BeanContext[] getBeanContexts() {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ return this.deploymentRegistry.values().toArray(new
BeanContext[this.deploymentRegistry.size()]);
+ } finally {
+ l.unlock();
+ }
}
@Override
- public synchronized BeanContext getBeanContext(final Object deploymentID) {
- final String id = (String) deploymentID;
- return deploymentRegistry.get(id);
+ public BeanContext getBeanContext(final Object deploymentID) {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ final String id = (String) deploymentID;
+ return deploymentRegistry.get(id);
+ } finally {
+ l.unlock();
+ }
}
@Override
@@ -99,10 +117,16 @@ public class StatelessContainer implemen
@Override
public void deploy(final BeanContext beanContext) throws OpenEJBException {
- final String id = (String) beanContext.getDeploymentID();
- synchronized (this) {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ final String id = (String) beanContext.getDeploymentID();
deploymentRegistry.put(id, beanContext);
beanContext.setContainer(this);
+ } finally {
+ l.unlock();
}
// add it before starting the timer (@PostCostruct)
@@ -131,11 +155,15 @@ public class StatelessContainer implemen
public void undeploy(final BeanContext beanContext) {
this.instanceManager.undeploy(beanContext);
- synchronized (this) {
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+ try {
final String id = (String) beanContext.getDeploymentID();
beanContext.setContainer(null);
beanContext.setContainerData(null);
this.deploymentRegistry.remove(id);
+ } finally {
+ l.unlock();
}
}
@@ -163,6 +191,7 @@ public class StatelessContainer implemen
final ThreadContext callContext = new ThreadContext(beanContext,
primKey);
final ThreadContext oldCallContext = ThreadContext.enter(callContext);
+ OpenEJBException oejbe = null;
Instance bean = null;
final CurrentCreationalContext currentCreationalContext =
beanContext.get(CurrentCreationalContext.class);
@@ -196,22 +225,28 @@ public class StatelessContainer implemen
currentCreationalContext.set(bean.creationalContext);
}
return _invoke(callMethod, runMethod, args, bean, callContext,
type);
+ } catch (final OpenEJBException t) {
+ oejbe = t;
+ }
- } finally {
-
- if (bean != null) {
- if (callContext.isDiscardInstance()) {
- this.instanceManager.discardInstance(callContext, bean);
- } else {
- this.instanceManager.poolInstance(callContext, bean);
- }
+ if (bean != null) {
+ if (callContext.isDiscardInstance()) {
+ this.instanceManager.discardInstance(callContext, bean);
+ } else {
+ this.instanceManager.poolInstance(callContext, bean);
}
+ }
- ThreadContext.exit(oldCallContext);
+ ThreadContext.exit(oldCallContext);
- if (currentCreationalContext != null) {
- currentCreationalContext.remove();
- }
+ if (currentCreationalContext != null) {
+ currentCreationalContext.remove();
+ }
+
+ if (null != oejbe) {
+ throw oejbe;
+ } else {
+ return null;
}
}
Modified:
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
---
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
(original)
+++
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
Mon Jul 21 13:12:59 2014
@@ -133,6 +133,7 @@ public class StatelessInstanceManager {
@Override
public void discard(final Instance instance, final Pool.Event reason) {
+
final ThreadContext ctx = new ThreadContext(beanContext, null);
final ThreadContext oldCallContext = ThreadContext.enter(ctx);
try {
@@ -194,11 +195,12 @@ public class StatelessInstanceManager {
throw new OpenEJBException("Unexpected Interruption of current
thread: ", e);
}
- if (instance != null) {
+ if (null == instance) {
+ instance = createInstance(callContext, beanContext);
+ }
+
return instance;
}
- return createInstance(callContext, beanContext);
- }
private Instance createInstance(final ThreadContext callContext, final
BeanContext beanContext) throws ApplicationException {
try {
@@ -246,6 +248,7 @@ public class StatelessInstanceManager {
* @throws OpenEJBException
*/
public void poolInstance(final ThreadContext callContext, final Object
bean) throws OpenEJBException {
+
if (bean == null) {
throw new SystemException("Invalid arguments");
}
@@ -270,6 +273,7 @@ public class StatelessInstanceManager {
* @param bean Object
*/
public void discardInstance(final ThreadContext callContext, final Object
bean) throws SystemException {
+
if (bean == null) {
throw new SystemException("Invalid arguments");
}
@@ -453,6 +457,7 @@ public class StatelessInstanceManager {
if (!data.closePool()) {
logger.error("Timed-out waiting for stateless pool to close:
for deployment '" + beanContext.getDeploymentID() + "'");
}
+
} catch (final InterruptedException e) {
Thread.interrupted();
}
Modified:
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
---
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
(original)
+++
tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
Mon Jul 21 13:12:59 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
@@ -153,9 +154,30 @@ public class Pool<T> {
}
private Executor createExecutor() {
- return new ThreadPoolExecutor(3, 10,
+ final ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(3, 10,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2), new
DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode()));
+
+ threadPoolExecutor.setRejectedExecutionHandler(new
RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+
org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB,
"org.apache.openejb.util.resources")
+ .warning("Default pool executor failed to run
asynchronous process: " + r);
+ }
+ } catch (final InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
+
+ return threadPoolExecutor;
}
private void greater(final String maxName, final long max, final String
minName, final long min) {
@@ -420,6 +442,7 @@ public class Pool<T> {
}
public boolean close(final long timeout, final TimeUnit unit) throws
InterruptedException {
+
// drain all keys so no new instances will be accepted into the pool
while (instances.tryAcquire()) {
Thread.yield();
Modified:
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
---
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
(original)
+++
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/singleton/AsyncPostContructTest.java
Mon Jul 21 13:12:59 2014
@@ -34,6 +34,8 @@ import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -71,27 +73,27 @@ public class AsyncPostContructTest {
private SessionContext sc;
private Future<Boolean> future;
- private long startEnd;
- private long asyncStart;
- private Object startInstance;
- private Object asyncInstance;
+ private final AtomicLong startEnd = new AtomicLong();
+ private final AtomicLong asyncStart = new AtomicLong();
+ private final AtomicReference<Object> startInstance = new
AtomicReference<Object>();
+ private final AtomicReference<Object> asyncInstance = new
AtomicReference<Object>();
@PostConstruct
public void start() {
- startInstance = this;
+ startInstance.set(this);
future = sc.getBusinessObject(BuildMeAsync.class).async();
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
// no-op
}
- startEnd = System.nanoTime();
+ startEnd.set(System.nanoTime());
}
@Asynchronous
public Future<Boolean> async() {
- asyncStart = System.nanoTime();
- asyncInstance = this;
+ asyncStart.set(System.nanoTime());
+ asyncInstance.set(this);
return new AsyncResult<Boolean>(true);
}
@@ -104,19 +106,19 @@ public class AsyncPostContructTest {
}
public long getStartEnd() {
- return startEnd;
+ return startEnd.get();
}
public long getAsyncStart() {
- return asyncStart;
+ return asyncStart.get();
}
public Object getStartInstance() {
- return startInstance;
+ return startInstance.get();
}
public Object getAsyncInstance() {
- return asyncInstance;
+ return asyncInstance.get();
}
}
}
Modified:
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java?rev=1612261&r1=1612260&r2=1612261&view=diff
==============================================================================
---
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
(original)
+++
tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
Mon Jul 21 13:12:59 2014
@@ -34,7 +34,6 @@ import javax.ejb.Remote;
import javax.ejb.Stateless;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -101,8 +100,6 @@ public class StatelessInstanceManagerPoo
final int count = 30;
final CountDownLatch invocations = new CountDownLatch(count);
final InitialContext ctx = new InitialContext();
- final Runnable counterBeanLocal = new Runnable() {
- public void run() {
Object object = null;
try {
@@ -123,14 +120,46 @@ public class StatelessInstanceManagerPoo
// 'count' instances should be created and discarded.
final Collection<Thread> th = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
- final Thread thread = new Thread(counterBeanLocal);
- th.add(thread);
+ final Thread thread = new Thread(new Runnable() {
+ public void run() {
+
+ Object object = null;
+ try {
+ object = ctx.lookup("CounterBeanLocal");
+ } catch (final NamingException e) {
+ assertTrue(false);
+ }
+ final Counter counter = (Counter) object;
+ assertNotNull(counter);
+
+ boolean run = true;
+
+ while (run) {
+ try {
+ counter.explode();
+ } catch (final
javax.ejb.ConcurrentAccessTimeoutException e) {
+ //Try again in moment...
+ try {
+ Thread.sleep(10);
+ } catch (final InterruptedException ie) {
+ //Ignore
+ }
+ } catch (final Exception e) {
+ invocations.countDown();
+ run = false;
+ }
+ }
+ }
+ }, "test-thread-" + count);
+
+ thread.setDaemon(false);
thread.start();
}
- for (final Thread t : th) {
- t.join();
- }
+ final boolean success = invocations.await(20, TimeUnit.SECONDS);
+
+ assertTrue("invocations timeout -> invocations.getCount() == " +
invocations.getCount(), success);
+
assertEquals(count, discardedInstances.get());
}
@@ -263,17 +292,9 @@ public class StatelessInstanceManagerPoo
return instances.get();
}
- public int discardCount() {
- return discardedInstances.get();
- }
-
public void explode(final CountDownLatch latch) {
- discardedInstances.incrementAndGet();
- try {
- throw new NullPointerException("Test expected this null
pointer: " + latch.getCount());
- } finally {
- latch.countDown();
- }
+ final int i = discardedInstances.incrementAndGet();
+ throw new NullPointerException("Test expected this null pointer: "
+ i);
}
public void race(final CountDownLatch ready, final CountDownLatch go) {