Hi Andy
Why is this lock needed? this should be fine since updates are only during
deployment. If not I'm tempted to fix it instead of adding a lock
Romain Manni-Bucau
Twitter: @rmannibucau
Blog: http://rmannibucau.wordpress.com/
LinkedIn: http://fr.linkedin.com/in/rmannibucau
Github: https://github.com/rmannibucau
---------- Forwarded message ----------
From: <andygumbre...@apache.org>
Date: 2014-07-21 15:11 GMT+02:00
Subject: svn commit: r1612260 - in
/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src:
main/java/org/apache/openejb/core/stateless/
main/java/org/apache/openejb/util/
test/java/org/apache/openejb/core/stateless/
To: comm...@tomee.apache.org
Author: andygumbrecht
Date: Mon Jul 21 13:11:53 2014
New Revision: 1612260
URL: http://svn.apache.org/r1612260
Log:
Fix StatelessInstanceManagerPoolingTest - ConcurrentAccessTimeoutException
Pool - Unhandled ex handler.
StatelessContainer - Synchronization
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
---
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
(original)
+++
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java
Mon Jul 21 13:11:53 2014
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
@@ -59,6 +60,7 @@ import static org.apache.openejb.core.tr
*/
public class StatelessContainer implements org.apache.openejb.RpcContainer
{
+ private final ReentrantLock lockRegistry = new ReentrantLock();
private final ConcurrentMap<Class<?>, List<Method>> interceptorCache =
new ConcurrentHashMap<Class<?>, List<Method>>();
private final StatelessInstanceManager instanceManager;
private final Map<String, BeanContext> deploymentRegistry = new
HashMap<String, BeanContext>();
@@ -77,14 +79,30 @@ public class StatelessContainer implemen
}
@Override
- public synchronized BeanContext[] getBeanContexts() {
- return this.deploymentRegistry.values().toArray(new
BeanContext[this.deploymentRegistry.size()]);
+ public BeanContext[] getBeanContexts() {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ return this.deploymentRegistry.values().toArray(new
BeanContext[this.deploymentRegistry.size()]);
+ } finally {
+ l.unlock();
+ }
}
@Override
- public synchronized BeanContext getBeanContext(final Object
deploymentID) {
- final String id = (String) deploymentID;
- return deploymentRegistry.get(id);
+ public BeanContext getBeanContext(final Object deploymentID) {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ final String id = (String) deploymentID;
+ return deploymentRegistry.get(id);
+ } finally {
+ l.unlock();
+ }
}
@Override
@@ -99,10 +117,16 @@ public class StatelessContainer implemen
@Override
public void deploy(final BeanContext beanContext) throws
OpenEJBException {
- final String id = (String) beanContext.getDeploymentID();
- synchronized (this) {
+
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+
+ try {
+ final String id = (String) beanContext.getDeploymentID();
deploymentRegistry.put(id, beanContext);
beanContext.setContainer(this);
+ } finally {
+ l.unlock();
}
// add it before starting the timer (@PostCostruct)
@@ -131,11 +155,15 @@ public class StatelessContainer implemen
public void undeploy(final BeanContext beanContext) {
this.instanceManager.undeploy(beanContext);
- synchronized (this) {
+ final ReentrantLock l = lockRegistry;
+ l.lock();
+ try {
final String id = (String) beanContext.getDeploymentID();
beanContext.setContainer(null);
beanContext.setContainerData(null);
this.deploymentRegistry.remove(id);
+ } finally {
+ l.unlock();
}
}
@@ -163,6 +191,7 @@ public class StatelessContainer implemen
final ThreadContext callContext = new ThreadContext(beanContext,
primKey);
final ThreadContext oldCallContext =
ThreadContext.enter(callContext);
+ OpenEJBException oejbe = null;
Instance bean = null;
final CurrentCreationalContext currentCreationalContext =
beanContext.get(CurrentCreationalContext.class);
@@ -196,22 +225,28 @@ public class StatelessContainer implemen
currentCreationalContext.set(bean.creationalContext);
}
return _invoke(callMethod, runMethod, args, bean, callContext,
type);
+ } catch (final OpenEJBException t) {
+ oejbe = t;
+ }
- } finally {
-
- if (bean != null) {
- if (callContext.isDiscardInstance()) {
- this.instanceManager.discardInstance(callContext,
bean);
- } else {
- this.instanceManager.poolInstance(callContext, bean);
- }
+ if (bean != null) {
+ if (callContext.isDiscardInstance()) {
+ this.instanceManager.discardInstance(callContext, bean);
+ } else {
+ this.instanceManager.poolInstance(callContext, bean);
}
+ }
- ThreadContext.exit(oldCallContext);
+ ThreadContext.exit(oldCallContext);
- if (currentCreationalContext != null) {
- currentCreationalContext.remove();
- }
+ if (currentCreationalContext != null) {
+ currentCreationalContext.remove();
+ }
+
+ if (null != oejbe) {
+ throw oejbe;
+ } else {
+ return null;
}
}
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
---
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
(original)
+++
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java
Mon Jul 21 13:11:53 2014
@@ -133,6 +133,7 @@ public class StatelessInstanceManager {
@Override
public void discard(final Instance instance, final Pool.Event
reason) {
+
final ThreadContext ctx = new ThreadContext(beanContext, null);
final ThreadContext oldCallContext = ThreadContext.enter(ctx);
try {
@@ -194,10 +195,11 @@ public class StatelessInstanceManager {
throw new OpenEJBException("Unexpected Interruption of current
thread: ", e);
}
- if (instance != null) {
- return instance;
+ if (null == instance) {
+ instance = createInstance(callContext, beanContext);
}
- return createInstance(callContext, beanContext);
+
+ return instance;
}
private Instance createInstance(final ThreadContext callContext, final
BeanContext beanContext) throws ApplicationException {
@@ -246,6 +248,7 @@ public class StatelessInstanceManager {
* @throws OpenEJBException
*/
public void poolInstance(final ThreadContext callContext, final Object
bean) throws OpenEJBException {
+
if (bean == null) {
throw new SystemException("Invalid arguments");
}
@@ -270,6 +273,7 @@ public class StatelessInstanceManager {
* @param bean Object
*/
public void discardInstance(final ThreadContext callContext, final
Object bean) throws SystemException {
+
if (bean == null) {
throw new SystemException("Invalid arguments");
}
@@ -453,6 +457,7 @@ public class StatelessInstanceManager {
if (!data.closePool()) {
logger.error("Timed-out waiting for stateless pool to
close: for deployment '" + beanContext.getDeploymentID() + "'");
}
+
} catch (final InterruptedException e) {
Thread.interrupted();
}
@@ -502,12 +507,12 @@ public class StatelessInstanceManager {
}
private final class InstanceCreatorRunnable implements Runnable {
- private long maxAge;
- private long iteration;
- private double maxAgeOffset;
- private long min;
- private Data data;
- private StatelessSupplier supplier;
+ private final long maxAge;
+ private final long iteration;
+ private final double maxAgeOffset;
+ private final long min;
+ private final Data data;
+ private final StatelessSupplier supplier;
private InstanceCreatorRunnable(final long maxAge, final long
iteration, final long min, final double maxAgeOffset, final Data data,
final StatelessSupplier supplier) {
this.maxAge = maxAge;
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
---
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
(original)
+++
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/DaemonThreadFactory.java
Mon Jul 21 13:11:53 2014
@@ -31,7 +31,7 @@ public class DaemonThreadFactory impleme
private final String name;
private final ThreadGroup group;
- private AtomicInteger ids = new AtomicInteger(0);
+ private final AtomicInteger ids = new AtomicInteger(0);
public DaemonThreadFactory(final Object... name) {
this.name = join(" ", name).trim();
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
---
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
(original)
+++
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
Mon Jul 21 13:11:53 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
@@ -153,9 +154,30 @@ public class Pool<T> {
}
private Executor createExecutor() {
- return new ThreadPoolExecutor(3, 10,
+ final ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(3, 10,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2), new
DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode()));
+
+ threadPoolExecutor.setRejectedExecutionHandler(new
RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) {
+
org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB,
"org.apache.openejb.util.resources")
+ .warning("Default pool executor failed to run
asynchronous process: " + r);
+ }
+ } catch (final InterruptedException e) {
+ //Ignore
+ }
+ }
+ });
+
+ return threadPoolExecutor;
}
private void greater(final String maxName, final long max, final
String minName, final long min) {
@@ -420,6 +442,7 @@ public class Pool<T> {
}
public boolean close(final long timeout, final TimeUnit unit) throws
InterruptedException {
+
// drain all keys so no new instances will be accepted into the
pool
while (instances.tryAcquire()) {
Thread.yield();
Modified:
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
URL:
http://svn.apache.org/viewvc/tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java?rev=1612260&r1=1612259&r2=1612260&view=diff
==============================================================================
---
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
(original)
+++
tomee/tomee/branches/tomee-1.7.x/container/openejb-core/src/test/java/org/apache/openejb/core/stateless/StatelessInstanceManagerPoolingTest.java
Mon Jul 21 13:11:53 2014
@@ -34,7 +34,6 @@ import javax.ejb.Remote;
import javax.ejb.Stateless;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -98,7 +97,6 @@ public class StatelessInstanceManagerPoo
final int count = 50;
final CountDownLatch invocations = new CountDownLatch(count);
final InitialContext ctx = new InitialContext();
- final ArrayList<Thread> threads = new ArrayList<Thread>(count);
// 'count' instances should be created and discarded.
for (int i = 0; i < count; i++) {
@@ -113,24 +111,32 @@ public class StatelessInstanceManagerPoo
}
final Counter counter = (Counter) object;
assertNotNull(counter);
- try {
- counter.explode();
- } catch (final Exception e) {
- invocations.countDown();
+
+ boolean run = true;
+
+ while (run) {
+ try {
+ counter.explode();
+ } catch (final
javax.ejb.ConcurrentAccessTimeoutException e) {
+ //Try again in moment...
+ try {
+ Thread.sleep(10);
+ } catch (final InterruptedException ie) {
+ //Ignore
+ }
+ } catch (final Exception e) {
+ invocations.countDown();
+ run = false;
+ }
}
}
}, "test-thread-" + count);
thread.setDaemon(false);
- threads.add(thread);
thread.start();
}
- for (final Thread thread : threads) {
- thread.join();
- }
-
- final boolean success = invocations.await(160, TimeUnit.SECONDS);
+ final boolean success = invocations.await(20, TimeUnit.SECONDS);
assertTrue("invocations timeout -> invocations.getCount() == " +
invocations.getCount(), success);
@@ -265,13 +271,9 @@ public class StatelessInstanceManagerPoo
return instances.get();
}
- public int discardCount() {
- return discardedInstances.get();
- }
-
public void explode() {
- discardedInstances.incrementAndGet();
- throw new NullPointerException("Test expected this null
pointer");
+ final int i = discardedInstances.incrementAndGet();
+ throw new NullPointerException("Test expected this null
pointer: " + i);
}
public void race(final CountDownLatch ready, final CountDownLatch
go) {