Removing shared mdb/stateless code. We will use private classes for the mdb container like we originally have done with the stateless container.
(cherry picked from commit 34a159fbafa0bddcf9e0f8ca37700537e5fb3887) Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/4fc6f325 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/4fc6f325 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/4fc6f325 Branch: refs/heads/tomee-1.7.x Commit: 4fc6f325b184e018c2345f6d3c6a3fe7037c7e81 Parents: 5ac6feb Author: Thiago Veronezi <[email protected]> Authored: Wed Jan 17 10:11:46 2018 -0500 Committer: Thiago Veronezi <[email protected]> Committed: Wed Jan 17 13:16:55 2018 -0500 ---------------------------------------------------------------------- .../core/instance/InstanceCreatorRunnable.java | 41 -- .../openejb/core/instance/InstanceManager.java | 358 ---------------- .../core/instance/InstanceManagerData.java | 79 ---- .../openejb/core/mdb/MdbInstanceManager.java | 372 +++++++++++++++- .../apache/openejb/core/stateless/Instance.java | 48 +++ .../core/stateless/StatelessContainer.java | 1 - .../stateless/StatelessInstanceManager.java | 422 +++++++++++++++++-- 7 files changed, 808 insertions(+), 513 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java deleted file mode 100644 index d87c330..0000000 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceCreatorRunnable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.openejb.core.instance; - -import org.apache.openejb.core.mdb.Instance; - -public final class InstanceCreatorRunnable implements Runnable { - - private final InstanceManagerData data; - private final InstanceManager.InstanceSupplier supplier; - private final long offset; - - public InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset, - final InstanceManagerData data, final InstanceManager.InstanceSupplier supplier) { - this.data = data; - this.supplier = supplier; - this.offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l; - } - - @Override - public void run() { - final Instance obj = supplier.create(); - if (obj != null) { - data.getPool().add(obj, offset); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java deleted file mode 100644 index 8043a55..0000000 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManager.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openejb.core.instance; - -import org.apache.openejb.ApplicationException; -import org.apache.openejb.BeanContext; -import org.apache.openejb.OpenEJBException; -import org.apache.openejb.SystemException; -import org.apache.openejb.cdi.CdiEjbBean; -import org.apache.openejb.core.InstanceContext; -import org.apache.openejb.core.Operation; -import org.apache.openejb.core.ThreadContext; -import org.apache.openejb.core.interceptor.InterceptorData; -import org.apache.openejb.core.interceptor.InterceptorStack; -import org.apache.openejb.core.mdb.Instance; -import org.apache.openejb.loader.Options; -import org.apache.openejb.monitoring.LocalMBeanServer; -import org.apache.openejb.util.DaemonThreadFactory; -import org.apache.openejb.util.Duration; -import org.apache.openejb.util.LogCategory; -import org.apache.openejb.util.Logger; -import org.apache.openejb.util.Pool; - -import javax.ejb.ConcurrentAccessTimeoutException; -import javax.ejb.SessionBean; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.rmi.RemoteException; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -public abstract class InstanceManager { - protected static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); - protected static final Method removeSessionBeanMethod; - - static { // initialize it only once - Method foundRemoveMethod; - try { - foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove"); - } catch (final NoSuchMethodException e) { - foundRemoveMethod = null; - } - removeSessionBeanMethod = foundRemoveMethod; - } - - protected final Duration accessTimeout; - protected final Duration closeTimeout; - protected final Pool.Builder poolBuilder; - protected final ThreadPoolExecutor executor; - protected final ScheduledExecutorService scheduledExecutor; - - public InstanceManager(final Duration accessTimeout, final Duration closeTimeout, - final Pool.Builder poolBuilder, final int callbackThreads, - final ScheduledExecutorService ses) { - this.accessTimeout = accessTimeout; - this.closeTimeout = closeTimeout; - this.poolBuilder = poolBuilder; - this.scheduledExecutor = ses; - - if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) { - ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true); - } - - if (accessTimeout.getUnit() == null) { - accessTimeout.setUnit(TimeUnit.MILLISECONDS); - } - - final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1; - final ThreadFactory threadFactory = new DaemonThreadFactory("InstanceManagerPool.worker."); - this.executor = new ThreadPoolExecutor( - callbackThreads, callbackThreads * 2, - 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory); - - this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override - public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) { - - if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) { - return; - } - - try { - if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) { - logger.warning("Executor failed to run asynchronous process: " + r); - } - } catch (final InterruptedException e) { - //Ignore - } - } - }); - } - - protected final class InstanceSupplier implements Pool.Supplier<Instance> { - private final BeanContext beanContext; - - public InstanceSupplier(final BeanContext beanContext) { - this.beanContext = beanContext; - } - - @Override - public void discard(final Instance instance, final Pool.Event reason) { - - final ThreadContext ctx = new ThreadContext(beanContext, null); - final ThreadContext oldCallContext = ThreadContext.enter(ctx); - try { - freeInstance(ctx, instance); - } finally { - ThreadContext.exit(oldCallContext); - } - } - - @Override - public Instance create() { - final ThreadContext ctx = new ThreadContext(beanContext, null); - final ThreadContext oldCallContext = ThreadContext.enter(ctx); - try { - return createInstance(ctx.getBeanContext()); - } catch (final OpenEJBException e) { - logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e); - } finally { - ThreadContext.exit(oldCallContext); - } - return null; - } - } - - public void destroy() { - if (executor != null) { - executor.shutdown(); - try { - if (!executor.awaitTermination(10000, MILLISECONDS)) { - java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (scheduledExecutor != null) { - scheduledExecutor.shutdown(); - try { - if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) { - java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - /** - * Removes an instance from the pool and returns it for use - * by the container in business methods. - * <p/> - * If the pool is at it's limit the StrictPooling flag will - * cause this thread to wait. - * <p/> - * If StrictPooling is not enabled this method will create a - * new bean instance performing all required injection - * and callbacks before returning it in a method ready state. - * - * @param callContext ThreadContext - * @return Object - * @throws OpenEJBException - */ - public Instance getInstance(final ThreadContext callContext) throws OpenEJBException { - final BeanContext beanContext = callContext.getBeanContext(); - final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData(); - - Instance instance = null; - try { - final Pool<Instance>.Entry entry = data.poolPop(); - - if (entry != null) { - instance = entry.get(); - instance.setPoolEntry(entry); - } - } catch (final TimeoutException e) { - final String msg = "No instances available in Session Bean pool. Waited " + data.getAccessTimeout().toString(); - final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg); - timeoutException.fillInStackTrace(); - throw new ApplicationException(timeoutException); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new OpenEJBException("Unexpected Interruption of current thread: ", e); - } - - if (null == instance) { - instance = createInstance(beanContext); - } - - return instance; - } - - private Instance createInstance(final BeanContext beanContext) throws ApplicationException { - try { - final InstanceContext context = beanContext.newInstance(); - return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext()); - - } catch (Throwable e) { - if (e instanceof InvocationTargetException) { - e = ((InvocationTargetException) e).getTargetException(); - } - final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e; - logger.error(t, e); - throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e)); - } - } - - /** - * All instances are removed from the pool in getInstance(...). They are only - * returned by the Container via this method under two circumstances. - * <p/> - * 1. The business method returns normally - * 2. The business method throws an application exception - * <p/> - * Instances are not returned to the pool if the business method threw a system - * exception. - * - * @param callContext ThreadContext - * @param bean Object - * @throws OpenEJBException - */ - public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException { - - if (bean == null) { - throw new SystemException("Invalid arguments"); - } - - final Instance instance = Instance.class.cast(bean); - final BeanContext beanContext = callContext.getBeanContext(); - final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData(); - final Pool<Instance> pool = data.getPool(); - - if (instance.getPoolEntry() != null) { - pool.push(instance.getPoolEntry()); - } else { - pool.push(instance); - } - } - - /** - * This method is called to release the semaphore in case of the business method - * throwing a system exception - * - * @param callContext ThreadContext - * @param bean Object - */ - public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException { - - if (bean == null) { - throw new SystemException("Invalid arguments"); - } - - final Instance instance = Instance.class.cast(bean); - final BeanContext beanContext = callContext.getBeanContext(); - final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData(); - - if (null != data) { - final Pool<Instance> pool = data.getPool(); - pool.discard(instance.getPoolEntry()); - } - } - - @SuppressWarnings("unchecked") - private void freeInstance(final ThreadContext callContext, final Instance instance) { - try { - callContext.setCurrentOperation(Operation.PRE_DESTROY); - final BeanContext beanContext = callContext.getBeanContext(); - - final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null; - - final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors(); - final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors); - - final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class); - if (bean != null) { // TODO: see if it should be called before or after next call - bean.getInjectionTarget().preDestroy(instance.bean); - } - interceptorStack.invoke(); - - if (instance.creationalContext != null) { - instance.creationalContext.release(); - } - } catch (final Throwable re) { - logger.error("The bean instance " + instance + " threw a system exception:" + re, re); - } - - } - - protected void setDefault(final Duration duration, final TimeUnit unit) { - if (duration.getUnit() == null) { - duration.setUnit(unit); - } - } - - protected Duration getDuration(final Options options, final String property, final Duration defaultValue, final TimeUnit defaultUnit) { - final String s = options.get(property, defaultValue.toString()); - final Duration duration = new Duration(s); - if (duration.getUnit() == null) { - duration.setUnit(defaultUnit); - } - return duration; - } - - public void undeploy(final BeanContext beanContext) { - final InstanceManagerData data = (InstanceManagerData) beanContext.getContainerData(); - if (data == null) { - return; - } - - final MBeanServer server = LocalMBeanServer.get(); - for (final ObjectName objectName : data.getJmxNames()) { - try { - server.unregisterMBean(objectName); - } catch (final Exception e) { - logger.error("Unable to unregister MBean " + objectName); - } - } - - try { - if (!data.closePool()) { - logger.error("Timed-out waiting for instance manager pool to close: for deployment '" + beanContext.getDeploymentID() + "'"); - } - - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - - beanContext.setContainerData(null); - } - -} http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java b/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java deleted file mode 100644 index a68b324..0000000 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/instance/InstanceManagerData.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openejb.core.instance; - -import org.apache.openejb.core.BaseContext; -import org.apache.openejb.core.mdb.Instance; -import org.apache.openejb.util.Duration; -import org.apache.openejb.util.Pool; - -import javax.management.ObjectName; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; - -public class InstanceManagerData { - - private final Pool<Instance> pool; - private final Duration accessTimeout; - private final Duration closeTimeout; - private final List<ObjectName> jmxNames = new ArrayList<ObjectName>(); - private BaseContext baseContext; - - public InstanceManagerData(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) { - this.pool = pool; - this.accessTimeout = accessTimeout; - this.closeTimeout = closeTimeout; - } - - public Duration getAccessTimeout() { - return accessTimeout; - } - - public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException { - return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit()); - } - - public Pool<Instance> getPool() { - return pool; - } - public void flush() { - this.pool.flush(); - } - - public boolean closePool() throws InterruptedException { - return pool.close(closeTimeout.getTime(), closeTimeout.getUnit()); - } - - public ObjectName add(final ObjectName name) { - jmxNames.add(name); - return name; - } - - public List<ObjectName> getJmxNames() { - return jmxNames; - } - - public BaseContext getBaseContext() { - return baseContext; - } - - public void setBaseContext(BaseContext baseContext) { - this.baseContext = baseContext; - } -} http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java index d1fcd6c..a74dd03 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java @@ -17,23 +17,34 @@ package org.apache.openejb.core.mdb; +import org.apache.openejb.ApplicationException; import org.apache.openejb.BeanContext; import org.apache.openejb.OpenEJBException; -import org.apache.openejb.core.instance.InstanceCreatorRunnable; -import org.apache.openejb.core.instance.InstanceManager; -import org.apache.openejb.core.instance.InstanceManagerData; +import org.apache.openejb.SystemException; +import org.apache.openejb.cdi.CdiEjbBean; +import org.apache.openejb.core.BaseContext; +import org.apache.openejb.core.InstanceContext; +import org.apache.openejb.core.Operation; +import org.apache.openejb.core.ThreadContext; +import org.apache.openejb.core.interceptor.InterceptorData; +import org.apache.openejb.core.interceptor.InterceptorStack; import org.apache.openejb.loader.Options; import org.apache.openejb.monitoring.LocalMBeanServer; import org.apache.openejb.monitoring.ManagedMBean; import org.apache.openejb.monitoring.ObjectNameBuilder; import org.apache.openejb.monitoring.StatsInterceptor; import org.apache.openejb.spi.SecurityService; +import org.apache.openejb.util.DaemonThreadFactory; import org.apache.openejb.util.Duration; +import org.apache.openejb.util.LogCategory; +import org.apache.openejb.util.Logger; import org.apache.openejb.util.PassthroughFactory; import org.apache.openejb.util.Pool; import org.apache.xbean.recipe.ObjectRecipe; import org.apache.xbean.recipe.Option; +import javax.ejb.ConcurrentAccessTimeoutException; +import javax.ejb.SessionBean; import javax.management.Attribute; import javax.management.AttributeList; import javax.management.AttributeNotFoundException; @@ -56,18 +67,47 @@ import javax.resource.spi.ActivationSpec; import javax.resource.spi.ResourceAdapter; import java.io.Flushable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static javax.management.MBeanOperationInfo.ACTION; -public class MdbInstanceManager extends InstanceManager { +public class MdbInstanceManager { + protected static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); + protected static final Method removeSessionBeanMethod; + + static { // initialize it only once + Method foundRemoveMethod; + try { + foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove"); + } catch (final NoSuchMethodException e) { + foundRemoveMethod = null; + } + removeSessionBeanMethod = foundRemoveMethod; + } + + private final Duration accessTimeout; + private final Duration closeTimeout; + private final Pool.Builder poolBuilder; + private final ThreadPoolExecutor executor; + private final ScheduledExecutorService scheduledExecutor; private final Map<BeanContext, MdbPoolContainer.MdbActivationContext> activationContexts = new ConcurrentHashMap<>(); private final Map<BeanContext, ObjectName> mbeanNames = new ConcurrentHashMap<>(); @@ -84,7 +124,43 @@ public class MdbInstanceManager extends InstanceManager { final Duration accessTimeout, final Duration closeTimeout, final Pool.Builder poolBuilder, final int callbackThreads, final ScheduledExecutorService ses) { - super(accessTimeout, closeTimeout, poolBuilder, callbackThreads, ses); + this.accessTimeout = accessTimeout; + this.closeTimeout = closeTimeout; + this.poolBuilder = poolBuilder; + this.scheduledExecutor = ses; + + if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) { + ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true); + } + + if (accessTimeout.getUnit() == null) { + accessTimeout.setUnit(TimeUnit.MILLISECONDS); + } + + final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1; + final ThreadFactory threadFactory = new DaemonThreadFactory("InstanceManagerPool.worker."); + this.executor = new ThreadPoolExecutor( + callbackThreads, callbackThreads * 2, + 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory); + + this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) { + + if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) { + return; + } + + try { + if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) { + logger.warning("Executor failed to run asynchronous process: " + r); + } + } catch (final InterruptedException e) { + //Ignore + } + } + }); + this.securityService = securityService; this.resourceAdapter = resourceAdapter; this.inboundRecovery = inboundRecovery; @@ -93,7 +169,7 @@ public class MdbInstanceManager extends InstanceManager { public void deploy(final BeanContext beanContext, final ActivationSpec activationSpec, final EndpointFactory endpointFactory) - throws OpenEJBException{ + throws OpenEJBException { if (inboundRecovery != null) { inboundRecovery.recover(resourceAdapter, activationSpec, containerID.toString()); } @@ -118,7 +194,7 @@ public class MdbInstanceManager extends InstanceManager { final long maxAge = builder.getMaxAge().getTime(TimeUnit.MILLISECONDS); final double maxAgeOffset = builder.getMaxAgeOffset(); - final InstanceManagerData data = new InstanceManagerData(builder.build(), accessTimeout, closeTimeout); + final Data data = new Data(builder.build(), accessTimeout, closeTimeout); MdbContext mdbContext = new MdbContext(securityService, new Flushable() { @Override @@ -182,7 +258,7 @@ public class MdbInstanceManager extends InstanceManager { String jmxName = beanContext.getActivationProperties().get("MdbJMXControl"); if (jmxName == null) { - jmxName = "true"; + jmxName = "true"; } addJMxControl(beanContext, jmxName, activationContext); @@ -209,7 +285,7 @@ public class MdbInstanceManager extends InstanceManager { data.getPool().start(); } - public void undeploy(final BeanContext beanContext){ + public void undeploy(final BeanContext beanContext) { final MdbPoolContainer.MdbActivationContext actContext = activationContexts.get(beanContext); if (actContext == null) { return; @@ -335,4 +411,282 @@ public class MdbInstanceManager extends InstanceManager { return ATTRIBUTE_LIST; } } + + private final class InstanceSupplier implements Pool.Supplier<Instance> { + private final BeanContext beanContext; + + public InstanceSupplier(final BeanContext beanContext) { + this.beanContext = beanContext; + } + + @Override + public void discard(final Instance instance, final Pool.Event reason) { + + final ThreadContext ctx = new ThreadContext(beanContext, null); + final ThreadContext oldCallContext = ThreadContext.enter(ctx); + try { + freeInstance(ctx, instance); + } finally { + ThreadContext.exit(oldCallContext); + } + } + + @Override + public Instance create() { + final ThreadContext ctx = new ThreadContext(beanContext, null); + final ThreadContext oldCallContext = ThreadContext.enter(ctx); + try { + return createInstance(ctx.getBeanContext()); + } catch (final OpenEJBException e) { + logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e); + } finally { + ThreadContext.exit(oldCallContext); + } + return null; + } + } + + public void destroy() { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + try { + if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Removes an instance from the pool and returns it for use + * by the container in business methods. + * <p/> + * If the pool is at it's limit the StrictPooling flag will + * cause this thread to wait. + * <p/> + * If StrictPooling is not enabled this method will create a + * new bean instance performing all required injection + * and callbacks before returning it in a method ready state. + * + * @param callContext ThreadContext + * @return Object + * @throws OpenEJBException + */ + public Instance getInstance(final ThreadContext callContext) throws OpenEJBException { + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + + Instance instance = null; + try { + final Pool<Instance>.Entry entry = data.poolPop(); + + if (entry != null) { + instance = entry.get(); + instance.setPoolEntry(entry); + } + } catch (final TimeoutException e) { + final String msg = "No instances available in Session Bean pool. Waited " + data.getAccessTimeout().toString(); + final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg); + timeoutException.fillInStackTrace(); + throw new ApplicationException(timeoutException); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OpenEJBException("Unexpected Interruption of current thread: ", e); + } + + if (null == instance) { + instance = createInstance(beanContext); + } + + return instance; + } + + private Instance createInstance(final BeanContext beanContext) throws ApplicationException { + try { + final InstanceContext context = beanContext.newInstance(); + return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext()); + + } catch (Throwable e) { + if (e instanceof InvocationTargetException) { + e = ((InvocationTargetException) e).getTargetException(); + } + final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e; + logger.error(t, e); + throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e)); + } + } + + /** + * All instances are removed from the pool in getInstance(...). They are only + * returned by the Container via this method under two circumstances. + * <p/> + * 1. The business method returns normally + * 2. The business method throws an application exception + * <p/> + * Instances are not returned to the pool if the business method threw a system + * exception. + * + * @param callContext ThreadContext + * @param bean Object + * @throws OpenEJBException + */ + public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException { + + if (bean == null) { + throw new SystemException("Invalid arguments"); + } + + final Instance instance = Instance.class.cast(bean); + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + final Pool<Instance> pool = data.getPool(); + + if (instance.getPoolEntry() != null) { + pool.push(instance.getPoolEntry()); + } else { + pool.push(instance); + } + } + + /** + * This method is called to release the semaphore in case of the business method + * throwing a system exception + * + * @param callContext ThreadContext + * @param bean Object + */ + public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException { + + if (bean == null) { + throw new SystemException("Invalid arguments"); + } + + final Instance instance = Instance.class.cast(bean); + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + + if (null != data) { + final Pool<Instance> pool = data.getPool(); + pool.discard(instance.getPoolEntry()); + } + } + + @SuppressWarnings("unchecked") + private void freeInstance(final ThreadContext callContext, final Instance instance) { + try { + callContext.setCurrentOperation(Operation.PRE_DESTROY); + final BeanContext beanContext = callContext.getBeanContext(); + + final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null; + + final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors(); + final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors); + + final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class); + if (bean != null) { // TODO: see if it should be called before or after next call + bean.getInjectionTarget().preDestroy(instance.bean); + } + interceptorStack.invoke(); + + if (instance.creationalContext != null) { + instance.creationalContext.release(); + } + } catch (final Throwable re) { + logger.error("The bean instance " + instance + " threw a system exception:" + re, re); + } + + } + + private void setDefault(final Duration duration, final TimeUnit unit) { + if (duration.getUnit() == null) { + duration.setUnit(unit); + } + } + + private final class InstanceCreatorRunnable implements Runnable { + + private final Data data; + private final InstanceSupplier supplier; + private final long offset; + + public InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset, + final Data data, final InstanceSupplier supplier) { + this.data = data; + this.supplier = supplier; + this.offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l; + } + + @Override + public void run() { + final Instance obj = supplier.create(); + if (obj != null) { + data.getPool().add(obj, offset); + } + } + } + + private class Data { + + private final Pool<Instance> pool; + private final Duration accessTimeout; + private final Duration closeTimeout; + private final List<ObjectName> jmxNames = new ArrayList<ObjectName>(); + private BaseContext baseContext; + + public Data(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) { + this.pool = pool; + this.accessTimeout = accessTimeout; + this.closeTimeout = closeTimeout; + } + + public Duration getAccessTimeout() { + return accessTimeout; + } + + public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException { + return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit()); + } + + public Pool<Instance> getPool() { + return pool; + } + + public void flush() { + this.pool.flush(); + } + + public boolean closePool() throws InterruptedException { + return pool.close(closeTimeout.getTime(), closeTimeout.getUnit()); + } + + public ObjectName add(final ObjectName name) { + jmxNames.add(name); + return name; + } + + public List<ObjectName> getJmxNames() { + return jmxNames; + } + + public BaseContext getBaseContext() { + return baseContext; + } + + public void setBaseContext(BaseContext baseContext) { + this.baseContext = baseContext; + } + } + } http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java new file mode 100644 index 0000000..17bbf20 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/Instance.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openejb.core.stateless; + +import org.apache.openejb.util.Pool; + +import javax.enterprise.context.spi.CreationalContext; +import java.util.Map; + +/** + * @version $Rev$ $Date$ + */ +public class Instance { + public final Object bean; + public final Map<String, Object> interceptors; + public final CreationalContext creationalContext; + + private Pool<Instance>.Entry poolEntry; + + public Instance(final Object bean, final Map<String, Object> interceptors, final CreationalContext creationalContext) { + this.bean = bean; + this.interceptors = interceptors; + this.creationalContext = creationalContext; + } + + public Pool<Instance>.Entry getPoolEntry() { + return poolEntry; + } + + public void setPoolEntry(final Pool<Instance>.Entry poolEntry) { + this.poolEntry = poolEntry; + } +} http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java index 03a5dfd..5b4ee4a 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessContainer.java @@ -29,7 +29,6 @@ import org.apache.openejb.core.Operation; import org.apache.openejb.core.ThreadContext; import org.apache.openejb.core.interceptor.InterceptorData; import org.apache.openejb.core.interceptor.InterceptorStack; -import org.apache.openejb.core.mdb.Instance; import org.apache.openejb.core.security.AbstractSecurityService; import org.apache.openejb.core.timer.EjbTimerService; import org.apache.openejb.core.transaction.TransactionPolicy; http://git-wip-us.apache.org/repos/asf/tomee/blob/4fc6f325/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java index eb41196..9d1ee5c 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/stateless/StatelessInstanceManager.java @@ -17,12 +17,17 @@ package org.apache.openejb.core.stateless; +import org.apache.openejb.ApplicationException; import org.apache.openejb.BeanContext; import org.apache.openejb.OpenEJBException; -import org.apache.openejb.core.instance.InstanceCreatorRunnable; -import org.apache.openejb.core.instance.InstanceManager; -import org.apache.openejb.core.instance.InstanceManagerData; +import org.apache.openejb.SystemException; +import org.apache.openejb.cdi.CdiEjbBean; +import org.apache.openejb.core.InstanceContext; +import org.apache.openejb.core.Operation; +import org.apache.openejb.core.ThreadContext; +import org.apache.openejb.core.interceptor.InterceptorData; import org.apache.openejb.core.interceptor.InterceptorInstance; +import org.apache.openejb.core.interceptor.InterceptorStack; import org.apache.openejb.core.timer.TimerServiceWrapper; import org.apache.openejb.loader.Options; import org.apache.openejb.monitoring.LocalMBeanServer; @@ -30,46 +35,313 @@ import org.apache.openejb.monitoring.ManagedMBean; import org.apache.openejb.monitoring.ObjectNameBuilder; import org.apache.openejb.monitoring.StatsInterceptor; import org.apache.openejb.spi.SecurityService; +import org.apache.openejb.util.DaemonThreadFactory; import org.apache.openejb.util.Duration; +import org.apache.openejb.util.LogCategory; +import org.apache.openejb.util.Logger; import org.apache.openejb.util.PassthroughFactory; import org.apache.openejb.util.Pool; import org.apache.xbean.recipe.ObjectRecipe; import org.apache.xbean.recipe.Option; +import javax.ejb.ConcurrentAccessTimeoutException; import javax.ejb.EJBContext; +import javax.ejb.SessionBean; +import javax.ejb.SessionContext; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.naming.Context; import javax.naming.NamingException; import java.io.Flushable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; -public class StatelessInstanceManager extends InstanceManager { +import static java.util.concurrent.TimeUnit.MILLISECONDS; +public class StatelessInstanceManager { + private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); + private static final Method removeSessionBeanMethod; + + static { // initialize it only once + Method foundRemoveMethod; + try { + foundRemoveMethod = SessionBean.class.getDeclaredMethod("ejbRemove"); + } catch (final NoSuchMethodException e) { + foundRemoveMethod = null; + } + removeSessionBeanMethod = foundRemoveMethod; + } + + private final Duration accessTimeout; + private final Duration closeTimeout; private final SecurityService securityService; + private final Pool.Builder poolBuilder; + private final ThreadPoolExecutor executor; + private final ScheduledExecutorService scheduledExecutor; public StatelessInstanceManager(final SecurityService securityService, final Duration accessTimeout, final Duration closeTimeout, final Pool.Builder poolBuilder, final int callbackThreads, final ScheduledExecutorService ses) { - super(accessTimeout, closeTimeout, poolBuilder, callbackThreads, ses); this.securityService = securityService; + this.accessTimeout = accessTimeout; + this.closeTimeout = closeTimeout; + this.poolBuilder = poolBuilder; + this.scheduledExecutor = ses; + if (ScheduledThreadPoolExecutor.class.isInstance(ses) && !ScheduledThreadPoolExecutor.class.cast(ses).getRemoveOnCancelPolicy()) { + ScheduledThreadPoolExecutor.class.cast(ses).setRemoveOnCancelPolicy(true); + } + + if (accessTimeout.getUnit() == null) { + accessTimeout.setUnit(TimeUnit.MILLISECONDS); + } + + final int qsize = callbackThreads > 1 ? callbackThreads - 1 : 1; + final ThreadFactory threadFactory = new DaemonThreadFactory("StatelessPool.worker."); + this.executor = new ThreadPoolExecutor( + callbackThreads, callbackThreads * 2, + 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(qsize), threadFactory); + + this.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) { + + if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) { + return; + } + + try { + if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) { + logger.warning("Executor failed to run asynchronous process: " + r); + } + } catch (final InterruptedException e) { + //Ignore + } + } + }); } + private final class StatelessSupplier implements Pool.Supplier<Instance> { + private final BeanContext beanContext; + + private StatelessSupplier(final BeanContext beanContext) { + this.beanContext = beanContext; + } + + @Override + public void discard(final Instance instance, final Pool.Event reason) { + + final ThreadContext ctx = new ThreadContext(beanContext, null); + final ThreadContext oldCallContext = ThreadContext.enter(ctx); + try { + freeInstance(ctx, instance); + } finally { + ThreadContext.exit(oldCallContext); + } + } + + @Override + public Instance create() { + final ThreadContext ctx = new ThreadContext(beanContext, null); + final ThreadContext oldCallContext = ThreadContext.enter(ctx); + try { + return createInstance(ctx, ctx.getBeanContext()); + } catch (final OpenEJBException e) { + logger.error("Unable to fill pool: for deployment '" + beanContext.getDeploymentID() + "'", e); + } finally { + ThreadContext.exit(oldCallContext); + } + return null; + } + } + + public void destroy() { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.interrupted(); + } + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + try { + if (!scheduledExecutor.awaitTermination(10000, MILLISECONDS)) { + java.util.logging.Logger.getLogger(this.getClass().getName()).log(Level.WARNING, getClass().getSimpleName() + " pool timeout expired"); + } + } catch (final InterruptedException e) { + Thread.interrupted(); + } + } + } + + /** + * Removes an instance from the pool and returns it for use + * by the container in business methods. + * <p/> + * If the pool is at it's limit the StrictPooling flag will + * cause this thread to wait. + * <p/> + * If StrictPooling is not enabled this method will create a + * new stateless bean instance performing all required injection + * and callbacks before returning it in a method ready state. + * + * @param callContext ThreadContext + * @return Object + * @throws OpenEJBException + */ + public Instance getInstance(final ThreadContext callContext) throws OpenEJBException { + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + + Instance instance = null; + try { + final Pool<Instance>.Entry entry = data.poolPop(); + + if (entry != null) { + instance = entry.get(); + instance.setPoolEntry(entry); + } + } catch (final TimeoutException e) { + final String msg = "No instances available in Stateless Session Bean pool. Waited " + data.accessTimeout.toString(); + final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg); + timeoutException.fillInStackTrace(); + throw new ApplicationException(timeoutException); + } catch (final InterruptedException e) { + Thread.interrupted(); + throw new OpenEJBException("Unexpected Interruption of current thread: ", e); + } + + if (null == instance) { + instance = createInstance(callContext, beanContext); + } + + return instance; + } + + private Instance createInstance(final ThreadContext callContext, final BeanContext beanContext) throws ApplicationException { + try { + final InstanceContext context = beanContext.newInstance(); + return new Instance(context.getBean(), context.getInterceptors(), context.getCreationalContext()); + + } catch (Throwable e) { + if (e instanceof InvocationTargetException) { + e = ((InvocationTargetException) e).getTargetException(); + } + final String t = "The bean instance " + beanContext.getDeploymentID() + " threw a system exception:" + e; + logger.error(t, e); + throw new ApplicationException(new RemoteException("Cannot obtain a free instance.", e)); + } + } + + /** + * All instances are removed from the pool in getInstance(...). They are only + * returned by the StatelessContainer via this method under two circumstances. + * <p/> + * 1. The business method returns normally + * 2. The business method throws an application exception + * <p/> + * Instances are not returned to the pool if the business method threw a system + * exception. + * + * @param callContext ThreadContext + * @param bean Object + * @throws OpenEJBException + */ + public void poolInstance(final ThreadContext callContext, final Object bean) throws OpenEJBException { + + if (bean == null) { + throw new SystemException("Invalid arguments"); + } + + final Instance instance = Instance.class.cast(bean); + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + final Pool<Instance> pool = data.getPool(); + + if (instance.getPoolEntry() != null) { + pool.push(instance.getPoolEntry()); + } else { + pool.push(instance); + } + } + + /** + * This method is called to release the semaphore in case of the business method + * throwing a system exception + * + * @param callContext ThreadContext + * @param bean Object + */ + public void discardInstance(final ThreadContext callContext, final Object bean) throws SystemException { + + if (bean == null) { + throw new SystemException("Invalid arguments"); + } + + final Instance instance = Instance.class.cast(bean); + final BeanContext beanContext = callContext.getBeanContext(); + final Data data = (Data) beanContext.getContainerData(); + + if (null != data) { + final Pool<Instance> pool = data.getPool(); + pool.discard(instance.getPoolEntry()); + } + } + + @SuppressWarnings("unchecked") + private void freeInstance(final ThreadContext callContext, final Instance instance) { + try { + callContext.setCurrentOperation(Operation.PRE_DESTROY); + final BeanContext beanContext = callContext.getBeanContext(); + + final Method remove = instance.bean instanceof SessionBean ? removeSessionBeanMethod : null; + + final List<InterceptorData> callbackInterceptors = beanContext.getCallbackInterceptors(); + final InterceptorStack interceptorStack = new InterceptorStack(instance.bean, remove, Operation.PRE_DESTROY, callbackInterceptors, instance.interceptors); + + final CdiEjbBean<Object> bean = beanContext.get(CdiEjbBean.class); + if (bean != null) { // TODO: see if it should be called before or after next call + bean.getInjectionTarget().preDestroy(instance.bean); + } + interceptorStack.invoke(); + + if (instance.creationalContext != null) { + instance.creationalContext.release(); + } + } catch (final Throwable re) { + logger.error("The bean instance " + instance + " threw a system exception:" + re, re); + } + + } @SuppressWarnings("unchecked") public void deploy(final BeanContext beanContext) throws OpenEJBException { final Options options = new Options(beanContext.getProperties()); final Duration accessTimeout = getDuration( - options, - "AccessTimeout", - getDuration(options, "Timeout", this.accessTimeout, TimeUnit.MILLISECONDS), // default timeout - TimeUnit.MILLISECONDS + options, + "AccessTimeout", + getDuration(options, "Timeout", this.accessTimeout, TimeUnit.MILLISECONDS), // default timeout + TimeUnit.MILLISECONDS ); final Duration closeTimeout = getDuration(options, "CloseTimeout", this.closeTimeout, TimeUnit.MINUTES); @@ -84,29 +356,19 @@ public class StatelessInstanceManager extends InstanceManager { setDefault(builder.getIdleTimeout(), TimeUnit.MINUTES); setDefault(builder.getInterval(), TimeUnit.MINUTES); - final InstanceSupplier supplier = new InstanceSupplier(beanContext); + final StatelessSupplier supplier = new StatelessSupplier(beanContext); builder.setSupplier(supplier); builder.setExecutor(executor); - - final InstanceManagerData data = new InstanceManagerData(builder.build(), accessTimeout, closeTimeout); - - StatelessContext statelessContext = new StatelessContext(securityService, new Flushable() { - @Override - public void flush() throws IOException { - data.flush(); - } - }); - data.setBaseContext(statelessContext); - + final Data data = new Data(builder.build(), accessTimeout, closeTimeout); beanContext.setContainerData(data); - beanContext.set(EJBContext.class, data.getBaseContext()); + beanContext.set(EJBContext.class, data.sessionContext); try { final Context context = beanContext.getJndiEnc(); - context.bind("comp/EJBContext", data.getBaseContext()); - context.bind("comp/WebServiceContext", new EjbWsContext(statelessContext)); + context.bind("comp/EJBContext", data.sessionContext); + context.bind("comp/WebServiceContext", new EjbWsContext(data.sessionContext)); context.bind("comp/TimerService", new TimerServiceWrapper()); } catch (final NamingException e) { throw new OpenEJBException("Failed to bind EJBContext/WebServiceContext/TimerService", e); @@ -157,7 +419,7 @@ public class StatelessInstanceManager extends InstanceManager { if (server.isRegistered(objectName)) { server.unregisterMBean(objectName); } - server.registerMBean(new ManagedMBean(data.getPool()), objectName); + server.registerMBean(new ManagedMBean(data.pool), objectName); data.add(objectName); } catch (final Exception e) { logger.error("Unable to register MBean ", e); @@ -179,4 +441,114 @@ public class StatelessInstanceManager extends InstanceManager { data.getPool().start(); } + + private void setDefault(final Duration duration, final TimeUnit unit) { + if (duration.getUnit() == null) { + duration.setUnit(unit); + } + } + + private Duration getDuration(final Options options, final String property, final Duration defaultValue, final TimeUnit defaultUnit) { + final String s = options.get(property, defaultValue.toString()); + final Duration duration = new Duration(s); + if (duration.getUnit() == null) { + duration.setUnit(defaultUnit); + } + return duration; + } + + public void undeploy(final BeanContext beanContext) { + final Data data = (Data) beanContext.getContainerData(); + if (data == null) { + return; + } + + final MBeanServer server = LocalMBeanServer.get(); + for (final ObjectName objectName : data.jmxNames) { + try { + server.unregisterMBean(objectName); + } catch (final Exception e) { + logger.error("Unable to unregister MBean " + objectName); + } + } + + try { + if (!data.closePool()) { + logger.error("Timed-out waiting for stateless pool to close: for deployment '" + beanContext.getDeploymentID() + "'"); + } + + } catch (final InterruptedException e) { + Thread.interrupted(); + } + + beanContext.setContainerData(null); + } + + private final class Data { + private final Pool<Instance> pool; + private final Duration accessTimeout; + private final Duration closeTimeout; + private final List<ObjectName> jmxNames = new ArrayList<ObjectName>(); + private final SessionContext sessionContext; + + private Data(final Pool<Instance> pool, final Duration accessTimeout, final Duration closeTimeout) { + this.pool = pool; + this.accessTimeout = accessTimeout; + this.closeTimeout = closeTimeout; + this.sessionContext = new StatelessContext(securityService, new Flushable() { + @Override + public void flush() throws IOException { + getPool().flush(); + } + }); + } + + public Duration getAccessTimeout() { + return accessTimeout; + } + + public Pool<Instance>.Entry poolPop() throws InterruptedException, TimeoutException { + return pool.pop(accessTimeout.getTime(), accessTimeout.getUnit()); + } + + public Pool<Instance> getPool() { + return pool; + } + + public boolean closePool() throws InterruptedException { + return pool.close(closeTimeout.getTime(), closeTimeout.getUnit()); + } + + public ObjectName add(final ObjectName name) { + jmxNames.add(name); + return name; + } + } + + private final class InstanceCreatorRunnable implements Runnable { + private final long maxAge; + private final long iteration; + private final double maxAgeOffset; + private final long min; + private final Data data; + private final StatelessSupplier supplier; + + private InstanceCreatorRunnable(final long maxAge, final long iteration, final long min, final double maxAgeOffset, final Data data, final StatelessSupplier supplier) { + this.maxAge = maxAge; + this.iteration = iteration; + this.min = min; + this.maxAgeOffset = maxAgeOffset; + this.data = data; + this.supplier = supplier; + } + + @Override + public void run() { + final Instance obj = supplier.create(); + if (obj != null) { + final long offset = maxAge > 0 ? (long) (maxAge / maxAgeOffset * min * iteration) % maxAge : 0l; + data.getPool().add(obj, offset); + } + } + } }
